Skip to content

Commit

Permalink
Database improvements. Methods are now normalized into their own tabl…
Browse files Browse the repository at this point in the history
…e. Units are now determined for every type data.
  • Loading branch information
dbeaudoinfortin committed Jun 11, 2024
1 parent b7f1093 commit a974aa1
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 111 deletions.
23 changes: 18 additions & 5 deletions src/main/java/com/dbf/naps/data/loader/DataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@

public interface DataMapper {

@Select("SELECT id from naps.pollutants where name = #{name} AND method = #{method}")
public Integer getPollutantID(String name, String method);
@Select("SELECT id from naps.pollutants where name = #{name}")
public Integer getPollutantID(String name);

@Insert("INSERT into naps.pollutants (name, method)"
+ " values (#{name}, #{method})"
@Insert("INSERT into naps.pollutants (name)"
+ " values (#{name})"
+ " ON CONFLICT DO NOTHING;")
public int insertPollutant(String name, String method);
public int insertPollutant(String name);

@Select("<script> "
+ "SELECT id from naps.methods where dataset = #{dataset} and report_type = #{reportType} and method "
+ "<if test=\"method != null\">= #{method}</if>"
+ "<if test=\"method == null\">IS NULL</if>"
+ " and units = #{units}"
+ "</script>")
public Integer getMethodID(String dataset, String reportType, String method, String units);

@Insert("INSERT into naps.methods (dataset, report_type, method, units)"
+ " values (#{dataset}, #{reportType}, #{method}, #{units})"
+ " ON CONFLICT DO NOTHING;")
public int insertMethod(String dataset, String reportType, String method, String units);

@Select("SELECT id from naps.sites where NAPS_id = #{NAPSId}")
public Integer getSiteID(Integer NAPSId);
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/com/dbf/naps/data/loader/DataRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class DataRecord {

private Integer siteId;
private Integer pollutantId;
private Integer methodId;
private Date datetime;
private Integer year;
private Integer month;
Expand All @@ -15,17 +16,6 @@ public class DataRecord {
private BigDecimal data;

public DataRecord() {}

public DataRecord(DataRecord other) {
this.siteId = other.siteId;
this.pollutantId = other.pollutantId;
this.datetime = other.datetime;
this.year = other.year;
this.month = other.month;
this.day = other.day;
this.dayOfWeek = other.dayOfWeek;
this.data = other.data;
}

public Integer getSiteId() {
return siteId;
Expand Down Expand Up @@ -95,5 +85,13 @@ public BigDecimal getData() {

public void setData(BigDecimal data) {
this.data = data;
}

public Integer getMethodId() {
return methodId;
}

public void setMethodId(Integer methodId) {
this.methodId = methodId;
}
}
40 changes: 31 additions & 9 deletions src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ public abstract class FileLoadRunner implements Runnable {
//Holds a mapping of NAPSID to SiteID, shared across threads
private static final Map<Integer, Integer> siteIDLookup = new ConcurrentHashMap<Integer, Integer>(300);

//Holds a mapping of lookup key (Compound_Method) to PollutantID, shared across threads
private static final Map<String, Integer> pollutantIDLookup = new ConcurrentHashMap<String, Integer>(20);
//Holds a mapping of PollutantName to PollutantID, shared across threads
private static final Map<String, Integer> pollutantIDLookup = new ConcurrentHashMap<String, Integer>(200);

//Holds a mapping of lookupKey (dataset, report_type, method) to MethodID, shared across threads
private static final Map<String, Integer> methodIDLookup = new ConcurrentHashMap<String, Integer>(50);

private final int threadId;
private final LoaderOptions config;
Expand Down Expand Up @@ -100,25 +103,44 @@ protected Integer getSiteID(String napsID, long recordNumber) {
}
}

protected Integer getPollutantID(String rawPollutantName, String method) {
String lookupKey = rawPollutantName + "_" + method;
protected Integer getPollutantID(String rawPollutantName) {

//If one thread stamps overrides the data of another it's no big deal
return pollutantIDLookup.computeIfAbsent(lookupKey, pollutantName -> {
return pollutantIDLookup.computeIfAbsent(rawPollutantName, pollutantName -> {
Integer pollutantID = null;
pollutantName = PollutantMapping.lookupPollutantName(pollutantName);
//May or may not insert, let the DB manage contention
try(SqlSession session = sqlSessionFactory.openSession(true)) {
DataMapper mapper = session.getMapper(DataMapper.class);
mapper.insertPollutant(pollutantName, method);
pollutantID = mapper.getPollutantID(pollutantName, method);
mapper.insertPollutant(pollutantName);
pollutantID = mapper.getPollutantID(pollutantName);
}
if(null == pollutantID) {
throw new IllegalArgumentException("Could not find matching Pollutant ID for compound " + rawPollutantName + ", and method " + method);
if(null == pollutantID) { //Sanity check, should be impossible
throw new IllegalArgumentException("Could not find a matching pollutant ID with name " + rawPollutantName);
}
return pollutantID;
});
}

protected Integer getMethodID(String dataset, String reportType, String method, String units) {
String lookupKey = dataset + "_" + reportType + "_" + method + "_" + units;

//If one thread stamps overrides the data of another it's no big deal
return methodIDLookup.computeIfAbsent(lookupKey, key -> {
Integer methodID = null;

//May or may not insert, let the DB manage contention
try(SqlSession session = sqlSessionFactory.openSession(true)) {
DataMapper mapper = session.getMapper(DataMapper.class);
mapper.insertMethod(dataset, reportType, method, units);
methodID = mapper.getMethodID(dataset, reportType, method, units);
}
if(null == methodID) { //Sanity check, should be impossible
throw new IllegalArgumentException("Could not find a matching method ID using lookup key: " + key);
}
return methodID;
});
}

public int getThreadId() {
return threadId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

public interface ContinuousDataMapper {

@Insert("INSERT into naps.continuous_data (site_id, pollutant_id, date_time, year, month, day, hour, day_of_week, data)"
+ " values (#{siteId}, #{pollutantId}, #{datetime}, #{year}, #{month}, #{day}, #{hour}, #{dayOfWeek}, #{data})"
@Insert("INSERT into naps.continuous_data (site_id, pollutant_id, method_id, date_time, year, month, day, hour, day_of_week, data)"
+ " values (#{siteId}, #{pollutantId}, #{methodId}, #{datetime}, #{year}, #{month}, #{day}, #{hour}, #{dayOfWeek}, #{data})"
+ " ON CONFLICT DO NOTHING;")
public int insertContinuousData(Integer siteId, String pollutantId, Date datetime, Integer year, Integer month, Integer day, Integer hour, String dayOfWeek, BigDecimal data);
public int insertContinuousData(Integer siteId, String pollutantId, String methodId, Date datetime, Integer year, Integer month, Integer day, Integer hour, String dayOfWeek, BigDecimal data);

@Insert({"<script>",
"INSERT into naps.continuous_data (site_id, pollutant_id, date_time, year, month, day, hour, day_of_week, data) "
"INSERT into naps.continuous_data (site_id, pollutant_id, method_id, date_time, year, month, day, hour, day_of_week, data) "
+ "values ",
"<foreach collection='dataRecords' item='record' index='index' open='(' separator = '),(' close=')' >"
+ "#{record.siteId}, #{record.pollutantId}, #{record.datetime}, #{record.year}, #{record.month}, #{record.day}, #{record.hour}, #{record.dayOfWeek}, #{record.data}"
+ "#{record.siteId}, #{record.pollutantId}, #{record.methodId}, #{record.datetime}, #{record.year}, #{record.month}, #{record.day}, #{record.hour}, #{record.dayOfWeek}, #{record.data}"
+ "</foreach>"
+ " ON CONFLICT DO NOTHING;",
"</script>"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class ContinuousFileLoadRunner extends FileLoadRunner {

private static final CSVFormat csvFormat;


private static final Long ONE_HOUR_MS = 60*60*1000L;

static {
Expand All @@ -46,12 +45,11 @@ public ContinuousFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFa
super(threadId, config, sqlSessionFactory, rawFile);
}


@Override
public void processFile() throws Exception {
log.info(getThreadId() + ":: Starting CSV parsing for file " + getRawFile() + ".");
List<ContinuousDataRecord> records = new ArrayList<ContinuousDataRecord>(100);

//Load all the rows into memory. Let's assume we don't run out of memory. :)
try (Reader reader = new FileReader(getRawFile(), StandardCharsets.ISO_8859_1); CSVParser parser = csvFormat.parse(reader)) {
for(CSVRecord line : parser) {
Expand All @@ -62,25 +60,39 @@ public void processFile() throws Exception {
}

//More sanity checks, the line needs to start with a known pollutant
String compoudString = line.get(0).replace(".", ""); //PM2.5 -> PM25
String compoudString = line.get(0).replace(".", "").toUpperCase(); //PM2.5 -> PM25
if(!Compound.contains(compoudString)) continue;

boolean isPM25 = compoudString.equalsIgnoreCase(Compound.PM25.name());
boolean isPM25 = compoudString.equals(Compound.PM25.name());
if(!((isPM25 && line.size() == 32) || (!isPM25 && line.size() == 31))) {
throw new IllegalArgumentException("Wrong number of columns (" + line.size() + ") on row " + line.getRecordNumber() + ". Expected " + (isPM25? "32.":"31."));
}

int columnOffset = isPM25 ? 1:0;
String method = "CONT";
String method = null;
if(isPM25) {
//Append the PM25-specific method to the overall method
method += "_" + line.get(1);
//Only PM25 has a specific method
method = line.get(1);
}

//The units are expected to be consistent for each compound type
String units = null;
if (compoudString.equals(Compound.CO.name())) {
units = "ppm";
} else if (compoudString.equals(Compound.SO2.name()) || compoudString.equals(Compound.O3.name())) {
units = "ppb";
} else if(isPM25 || compoudString.equals(Compound.PM10.name())
|| compoudString.equals(Compound.NOX.name())
|| compoudString.equals(Compound.NO2.name())
|| compoudString.equals(Compound.NO.name())) {
units = "µg/m³";
}

//We create 24 records per CSV line, 1 per hour
for(int hour = 0; hour < 24; hour++) {
ContinuousDataRecord record = new ContinuousDataRecord();
record.setPollutantId(getPollutantID(compoudString, method));
record.setPollutantId(getPollutantID(compoudString));
record.setMethodId(getMethodID("Continuous", compoudString, method, units));
record.setSiteId(getSiteID(
line.get(1 + columnOffset),
line.get(2 + columnOffset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
public interface IntegratedDataMapper {

@Insert({"<script>",
"INSERT into naps.integrated_data (site_id, pollutant_id, date_time, year, month, day, fine, cartridge, media, day_of_week,"
"INSERT into naps.integrated_data (site_id, pollutant_id, method_id, date_time, year, month, day, fine, cartridge, media, day_of_week,"
+ "sample_mass, sample_vol, sample_duration, tsp, data) "
+ "values ",
"<foreach collection='dataRecords' item='record' index='index' open='(' separator = '),(' close=')' >"
+ "#{record.siteId}, #{record.pollutantId}, #{record.datetime}, #{record.year}, #{record.month}, #{record.day}, "
+ "#{record.siteId}, #{record.pollutantId}, #{record.methodId}, #{record.datetime}, #{record.year}, #{record.month}, #{record.day}, "
+ "#{record.fine, typeHandler=org.apache.ibatis.type.BooleanTypeHandler}, #{record.cartridge}, #{record.media}, #{record.dayOfWeek}, "
+ "#{record.mass}, #{record.volume}, #{record.duration}, #{record.tsp}, #{record.data}"
+ "</foreach>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,24 @@ public class NAPSIntegratedDataLoader extends NAPSDataLoader {
mappings.add(new IntegratedRunnerMapping(CFFileLoadRunner.class, "DICHOT", "_DICH.XLS"));
mappings.add(new IntegratedRunnerMapping(CFFileLoadRunner.class, "PM2.5", "_PART25.XLS"));

mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PAH", "_PAH.XLS"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "HCB", "_HCB.XLS"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PAH", "_PAH.XLS", "ng/m³"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "HCB", "_HCB.XLS", "ng/m³"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "VOC", "_VOC.XLS"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "VOC", "_VOCS.XLS")); //One file is mis-named :)
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCDD", "_PCDD.XLSX"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCDD", "_PCDD.XLS"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCB", "_PCB.XLS"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCDD", "_PCDD.XLSX", "pg/m³"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCDD", "_PCDD.XLS", "pg/m³"));
mappings.add(new IntegratedRunnerMapping(SampleMetaDataFileLoadRunner.class, "PCB", "_PCB.XLS", "pg/m³"));

mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "CARB", "_CARB.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "IC", "_IC.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "ICPMS", "_ICPMS.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "ICPMS", "_ICPMS.XLS", "ng/m³"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "NA", "_NA.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "NH4", "_NH4.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "SPEC", "_SPEC.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "WICPMS", "_WICPMS.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "LEV", "_LEV.XLS"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "WICPMS", "_WICPMS.XLS", "ng/m³"));
mappings.add(new IntegratedRunnerMapping(SpeciationFileLoadRunner.class, "LEV", "_LEV.XLS", "ng/m³"));

mappings.add(new IntegratedRunnerMapping(XLSXFileLoadRunner.class, "PAH", Pattern.compile("S[0-9]+_PAH_[0-9]{4}(_EN)?\\.XLSX"))); //Match S90121_PAH_2010.XLSX
mappings.add(new IntegratedRunnerMapping(XLSXFileLoadRunner.class, "PAH", Pattern.compile("S[0-9]+_PAH_[0-9]{4}(_EN)?\\.XLSX"), "ng/m³")); //Match S90121_PAH_2010.XLSX
mappings.add(new IntegratedRunnerMapping(XLSXFileLoadRunner.class, "PM2.5", Pattern.compile("S[0-9]+_PM25_[0-9]{4}(_EN)?\\.XLSX"))); //Match S40103_PM25_2010.XLSX
mappings.add(new IntegratedRunnerMapping(XLSXFileLoadRunner.class, "PM2.5-10", Pattern.compile("S[0-9]+_PM25\\-10_[0-9]{4}(_EN)?\\.XLSX"))); //Match S30113_PM25-10_2010.XLSX

Expand Down Expand Up @@ -101,8 +101,8 @@ protected Collection<Runnable> processFile(File dataFile) {
for(IntegratedRunnerMapping mapping : mappings) {
if((null != mapping.getFileNameMatch() && fileName.endsWith(mapping.getFileNameMatch()))
|| (null != mapping.getFileNamePattern() && mapping.getFileNamePattern().matcher(fileName).matches())) {
return Collections.singletonList((Runnable) mapping.getRunnerClass().getConstructor(int.class, LoaderOptions.class, SqlSessionFactory.class, File.class, String.class)
.newInstance(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, mapping.getFileType()));
return Collections.singletonList((Runnable) mapping.getRunnerClass().getConstructor(int.class, LoaderOptions.class, SqlSessionFactory.class, File.class, String.class, String.class)
.newInstance(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, mapping.getFileType(), mapping.getUnits()));
}
}
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/
public class CFFileLoadRunner extends IntegratedFileLoadRunner {

public CFFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile, method);
public CFFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method, String units) {
super(threadId, config, sqlSessionFactory, rawFile, method, units);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
public class CarbonylsFileLoadRunner extends VOCFileLoadRunner {

public CarbonylsFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile, method);
public CarbonylsFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method, String units) {
super(threadId, config, sqlSessionFactory, rawFile, method, units);
}

//Store these column indexes so we only have to look them up once for the entire sheet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,19 @@ public class IntegratedFileLoadRunner extends FileLoadRunner {
//State held during processing
private ExcelSheet sheet;
private int row;
private int col;
protected String method;
protected String units;

private Integer siteID; //Track the site id to read it only once
private Integer headerRowNumber; //Track when we have reached the real header row
private Integer siteIDColumn; //Track when we have reached the NAPS ID which represents the last column
private final String fileType;


public IntegratedFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String fileType) {
public IntegratedFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String fileType, String units) {
super(threadId, config, sqlSessionFactory, rawFile);
this.fileType = fileType;
this.units = units;
}

/**
Expand All @@ -116,9 +118,7 @@ protected List<String> getExcludedSheetNames() {
return DEFAULT_IGNORED_SHEETS;
}

protected void setMethod() {
this.method = "INT_" + fileType;
}
protected void setMethod() {} //By default the method is null, do nothing here.

/**
* Processing of a single sheet of the workbook.
Expand Down Expand Up @@ -250,7 +250,7 @@ protected List<IntegratedDataRecord> processRow(Date date) {

//Data is expected to start on column 2
//Last column is NAPS ID and is ignored
for (int col = 1; col < getLastColumn(); col++) {
for (col = 1; col < getLastColumn(); col++) {
String columnHeader = getSheet().getCellContents(col, getHeaderRowNumber()).trim();
if(isColumnIgnored(columnHeader)) continue;

Expand All @@ -266,6 +266,8 @@ protected List<IntegratedDataRecord> processRow(Date date) {
*
*/
private boolean isColumnIgnored(String columnHeader) {
if(columnHeader.equals("")) return true;

columnHeader = columnHeader.toUpperCase();
//ID is a special case that uses an exact match otherwise we might match
//a compound that either start or end with the letter "id"
Expand Down Expand Up @@ -306,7 +308,8 @@ protected IntegratedDataRecord processSingleRecord(String columnHeader, String c
IntegratedDataRecord record = new IntegratedDataRecord();
record.setDatetime(date);
record.setSiteId(siteID);
record.setPollutantId(getPollutantID(columnHeader, method));
record.setPollutantId(getPollutantID(columnHeader));
record.setMethodId(getMethodID("Integrated", fileType, method, units));

//Ignore empty cells, but not zeros
if(!"N.M.".equals(cellValue)) {
Expand Down Expand Up @@ -379,4 +382,8 @@ protected Integer getLastColumn() {
protected int getRow() {
return row;
}

protected int getColumn() {
return col;
}
}
Loading

0 comments on commit a974aa1

Please sign in to comment.