diff --git a/src/main/java/com/dbf/naps/data/loader/DataMapper.java b/src/main/java/com/dbf/naps/data/loader/DataMapper.java index 94462487f7a..fe9f79ed79e 100644 --- a/src/main/java/com/dbf/naps/data/loader/DataMapper.java +++ b/src/main/java/com/dbf/naps/data/loader/DataMapper.java @@ -6,13 +6,26 @@ public interface DataMapper { - @Select("SELECT id from naps.pollutants where name = #{name} AND method = #{method}") - public Integer getPollutantID(String name, String method); + @Select("SELECT id from naps.pollutants where name = #{name}") + public Integer getPollutantID(String name); - @Insert("INSERT into naps.pollutants (name, method)" - + " values (#{name}, #{method})" + @Insert("INSERT into naps.pollutants (name)" + + " values (#{name})" + " ON CONFLICT DO NOTHING;") - public int insertPollutant(String name, String method); + public int insertPollutant(String name); + + @Select("") + public Integer getMethodID(String dataset, String reportType, String method, String units); + + @Insert("INSERT into naps.methods (dataset, report_type, method, units)" + + " values (#{dataset}, #{reportType}, #{method}, #{units})" + + " ON CONFLICT DO NOTHING;") + public int insertMethod(String dataset, String reportType, String method, String units); @Select("SELECT id from naps.sites where NAPS_id = #{NAPSId}") public Integer getSiteID(Integer NAPSId); diff --git a/src/main/java/com/dbf/naps/data/loader/DataRecord.java b/src/main/java/com/dbf/naps/data/loader/DataRecord.java index 40e917d35a4..35f83ae2731 100644 --- a/src/main/java/com/dbf/naps/data/loader/DataRecord.java +++ b/src/main/java/com/dbf/naps/data/loader/DataRecord.java @@ -7,6 +7,7 @@ public class DataRecord { private Integer siteId; private Integer pollutantId; + private Integer methodId; private Date datetime; private Integer year; private Integer month; @@ -15,17 +16,6 @@ public class DataRecord { private BigDecimal data; public DataRecord() {} - - public DataRecord(DataRecord other) { - this.siteId = other.siteId; - this.pollutantId = other.pollutantId; - this.datetime = other.datetime; - this.year = other.year; - this.month = other.month; - this.day = other.day; - this.dayOfWeek = other.dayOfWeek; - this.data = other.data; - } public Integer getSiteId() { return siteId; @@ -95,5 +85,13 @@ public BigDecimal getData() { public void setData(BigDecimal data) { this.data = data; + } + + public Integer getMethodId() { + return methodId; + } + + public void setMethodId(Integer methodId) { + this.methodId = methodId; } } diff --git a/src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java b/src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java index 4eb7ab1c364..b0c704bb9f3 100644 --- a/src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java +++ b/src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java @@ -19,8 +19,11 @@ public abstract class FileLoadRunner implements Runnable { //Holds a mapping of NAPSID to SiteID, shared across threads private static final Map siteIDLookup = new ConcurrentHashMap(300); - //Holds a mapping of lookup key (Compound_Method) to PollutantID, shared across threads - private static final Map pollutantIDLookup = new ConcurrentHashMap(20); + //Holds a mapping of PollutantName to PollutantID, shared across threads + private static final Map pollutantIDLookup = new ConcurrentHashMap(200); + + //Holds a mapping of lookupKey (dataset, report_type, method) to MethodID, shared across threads + private static final Map methodIDLookup = new ConcurrentHashMap(50); private final int threadId; private final LoaderOptions config; @@ -100,25 +103,44 @@ protected Integer getSiteID(String napsID, long recordNumber) { } } - protected Integer getPollutantID(String rawPollutantName, String method) { - String lookupKey = rawPollutantName + "_" + method; + protected Integer getPollutantID(String rawPollutantName) { //If one thread stamps overrides the data of another it's no big deal - return pollutantIDLookup.computeIfAbsent(lookupKey, pollutantName -> { + return pollutantIDLookup.computeIfAbsent(rawPollutantName, pollutantName -> { Integer pollutantID = null; pollutantName = PollutantMapping.lookupPollutantName(pollutantName); //May or may not insert, let the DB manage contention try(SqlSession session = sqlSessionFactory.openSession(true)) { DataMapper mapper = session.getMapper(DataMapper.class); - mapper.insertPollutant(pollutantName, method); - pollutantID = mapper.getPollutantID(pollutantName, method); + mapper.insertPollutant(pollutantName); + pollutantID = mapper.getPollutantID(pollutantName); } - if(null == pollutantID) { - throw new IllegalArgumentException("Could not find matching Pollutant ID for compound " + rawPollutantName + ", and method " + method); + if(null == pollutantID) { //Sanity check, should be impossible + throw new IllegalArgumentException("Could not find a matching pollutant ID with name " + rawPollutantName); } return pollutantID; }); } + + protected Integer getMethodID(String dataset, String reportType, String method, String units) { + String lookupKey = dataset + "_" + reportType + "_" + method + "_" + units; + + //If one thread stamps overrides the data of another it's no big deal + return methodIDLookup.computeIfAbsent(lookupKey, key -> { + Integer methodID = null; + + //May or may not insert, let the DB manage contention + try(SqlSession session = sqlSessionFactory.openSession(true)) { + DataMapper mapper = session.getMapper(DataMapper.class); + mapper.insertMethod(dataset, reportType, method, units); + methodID = mapper.getMethodID(dataset, reportType, method, units); + } + if(null == methodID) { //Sanity check, should be impossible + throw new IllegalArgumentException("Could not find a matching method ID using lookup key: " + key); + } + return methodID; + }); + } public int getThreadId() { return threadId; diff --git a/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousDataMapper.java b/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousDataMapper.java index 9f06c3cf488..9520f07a1d9 100644 --- a/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousDataMapper.java +++ b/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousDataMapper.java @@ -8,16 +8,16 @@ public interface ContinuousDataMapper { - @Insert("INSERT into naps.continuous_data (site_id, pollutant_id, date_time, year, month, day, hour, day_of_week, data)" - + " values (#{siteId}, #{pollutantId}, #{datetime}, #{year}, #{month}, #{day}, #{hour}, #{dayOfWeek}, #{data})" + @Insert("INSERT into naps.continuous_data (site_id, pollutant_id, method_id, date_time, year, month, day, hour, day_of_week, data)" + + " values (#{siteId}, #{pollutantId}, #{methodId}, #{datetime}, #{year}, #{month}, #{day}, #{hour}, #{dayOfWeek}, #{data})" + " ON CONFLICT DO NOTHING;") - public int insertContinuousData(Integer siteId, String pollutantId, Date datetime, Integer year, Integer month, Integer day, Integer hour, String dayOfWeek, BigDecimal data); + public int insertContinuousData(Integer siteId, String pollutantId, String methodId, Date datetime, Integer year, Integer month, Integer day, Integer hour, String dayOfWeek, BigDecimal data); @Insert({""}) diff --git a/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousFileLoadRunner.java b/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousFileLoadRunner.java index dc62904009e..f399d421dd7 100644 --- a/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousFileLoadRunner.java +++ b/src/main/java/com/dbf/naps/data/loader/continuous/ContinuousFileLoadRunner.java @@ -27,7 +27,6 @@ public class ContinuousFileLoadRunner extends FileLoadRunner { private static final CSVFormat csvFormat; - private static final Long ONE_HOUR_MS = 60*60*1000L; static { @@ -46,12 +45,11 @@ public ContinuousFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFa super(threadId, config, sqlSessionFactory, rawFile); } - @Override public void processFile() throws Exception { log.info(getThreadId() + ":: Starting CSV parsing for file " + getRawFile() + "."); List records = new ArrayList(100); - + //Load all the rows into memory. Let's assume we don't run out of memory. :) try (Reader reader = new FileReader(getRawFile(), StandardCharsets.ISO_8859_1); CSVParser parser = csvFormat.parse(reader)) { for(CSVRecord line : parser) { @@ -62,25 +60,39 @@ public void processFile() throws Exception { } //More sanity checks, the line needs to start with a known pollutant - String compoudString = line.get(0).replace(".", ""); //PM2.5 -> PM25 + String compoudString = line.get(0).replace(".", "").toUpperCase(); //PM2.5 -> PM25 if(!Compound.contains(compoudString)) continue; - boolean isPM25 = compoudString.equalsIgnoreCase(Compound.PM25.name()); + boolean isPM25 = compoudString.equals(Compound.PM25.name()); if(!((isPM25 && line.size() == 32) || (!isPM25 && line.size() == 31))) { throw new IllegalArgumentException("Wrong number of columns (" + line.size() + ") on row " + line.getRecordNumber() + ". Expected " + (isPM25? "32.":"31.")); } int columnOffset = isPM25 ? 1:0; - String method = "CONT"; + String method = null; if(isPM25) { - //Append the PM25-specific method to the overall method - method += "_" + line.get(1); + //Only PM25 has a specific method + method = line.get(1); + } + + //The units are expected to be consistent for each compound type + String units = null; + if (compoudString.equals(Compound.CO.name())) { + units = "ppm"; + } else if (compoudString.equals(Compound.SO2.name()) || compoudString.equals(Compound.O3.name())) { + units = "ppb"; + } else if(isPM25 || compoudString.equals(Compound.PM10.name()) + || compoudString.equals(Compound.NOX.name()) + || compoudString.equals(Compound.NO2.name()) + || compoudString.equals(Compound.NO.name())) { + units = "µg/m³"; } //We create 24 records per CSV line, 1 per hour for(int hour = 0; hour < 24; hour++) { ContinuousDataRecord record = new ContinuousDataRecord(); - record.setPollutantId(getPollutantID(compoudString, method)); + record.setPollutantId(getPollutantID(compoudString)); + record.setMethodId(getMethodID("Continuous", compoudString, method, units)); record.setSiteId(getSiteID( line.get(1 + columnOffset), line.get(2 + columnOffset), diff --git a/src/main/java/com/dbf/naps/data/loader/integrated/IntegratedDataMapper.java b/src/main/java/com/dbf/naps/data/loader/integrated/IntegratedDataMapper.java index 934ca1460d7..4f92037a143 100644 --- a/src/main/java/com/dbf/naps/data/loader/integrated/IntegratedDataMapper.java +++ b/src/main/java/com/dbf/naps/data/loader/integrated/IntegratedDataMapper.java @@ -7,11 +7,11 @@ public interface IntegratedDataMapper { @Insert({"