Skip to content

Commit

Permalink
reworked the method definition for pollutants to make the method its …
Browse files Browse the repository at this point in the history
…own column. Added support for all of the speciation integrated datasets: CARB, IC, ICPMS, NA, NH4, SPEC, WICPMS, and LEV.
  • Loading branch information
dbeaudoinfortin committed May 28, 2024
1 parent a046166 commit 0761070
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 37 deletions.
10 changes: 5 additions & 5 deletions src/main/java/com/dbf/naps/data/loader/DataMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

public interface DataMapper {

@Select("SELECT id from naps.pollutants where name = #{name}")
public Integer getPollutantID(String name);
@Select("SELECT id from naps.pollutants where name = #{name} AND method = #{method}")
public Integer getPollutantID(String name, String method);

@Insert("INSERT into naps.pollutants (name)"
+ " values (#{name})"
@Insert("INSERT into naps.pollutants (name, method)"
+ " values (#{name}, #{method})"
+ " ON CONFLICT DO NOTHING;")
public int insertPollutant(String name);
public int insertPollutant(String name, String method);

@Select("SELECT id from naps.sites where NAPS_id = #{NAPSId}")
public Integer getSiteID(Integer NAPSId);
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/com/dbf/naps/data/loader/FileLoadRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public abstract class FileLoadRunner implements Runnable {
//Holds a mapping of NAPSID to SiteID, shared across threads
private static final Map<Integer, Integer> siteIDLookup = new ConcurrentHashMap<Integer, Integer>(300);

//Holds a mapping of Compound to PollutantID, shared across threads
//Holds a mapping of lookup key (Compound_Method) to PollutantID, shared across threads
private static final Map<String, Integer> pollutantIDLookup = new ConcurrentHashMap<String, Integer>(20);

private final int threadId;
Expand Down Expand Up @@ -99,18 +99,20 @@ protected Integer getSiteID(String napsID, long recordNumber) {
}
}

protected Integer getPollutantID(String compound) {
protected Integer getPollutantID(String compound, String method) {
String lookupKey = compound + "_" + method;

//If one thread stamps overrides the data of another it's no big deal
return pollutantIDLookup.computeIfAbsent(compound, key -> {
return pollutantIDLookup.computeIfAbsent(lookupKey, k -> {
Integer pollutantID = null;
//May or may not insert, let the DB manage contention
try(SqlSession session = sqlSessionFactory.openSession(true)) {
DataMapper mapper = session.getMapper(DataMapper.class);
mapper.insertPollutant(compound);
pollutantID = mapper.getPollutantID(compound);
mapper.insertPollutant(compound, method);
pollutantID = mapper.getPollutantID(compound, method);
}
if(null == pollutantID) {
throw new IllegalArgumentException("Could not find matching Pollutant ID for compound " + compound);
throw new IllegalArgumentException("Could not find matching Pollutant ID for compound " + compound + ", and method " + method);
}
return pollutantID;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ public void processFile() throws Exception {
}

int columnOffset = isPM25 ? 1:0;
if(isPM25) compoudString += "_" + line.get(1); //Append the method to the compound
String method = "CONT";
if(isPM25) {
//Append the PM25-specific method to the overall method
method += "_" + line.get(1);
}

//We create 24 records per CSV line, 1 per hour
for(int hour = 0; hour < 24; hour++) {
ContinuousDataRecord record = new ContinuousDataRecord();
record.setPollutantId(getPollutantID(compoudString));
record.setPollutantId(getPollutantID(compoudString, method));
record.setSiteId(getSiteID(
line.get(1 + columnOffset),
line.get(2 + columnOffset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
public interface IntegratedDataMapper {

@Insert({"<script>",
"INSERT into naps.integrated_data (site_id, pollutant_id, date_time, year, month, day, fine, day_of_week, sample_mass, sample_vol, sample_duration, tsp, data) "
"INSERT into naps.integrated_data (site_id, pollutant_id, date_time, year, month, day, fine, cartridge, media, day_of_week,"
+ "sample_mass, sample_vol, sample_duration, tsp, data) "
+ "values ",
"<foreach collection='dataRecords' item='record' index='index' open='(' separator = '),(' close=')' >"
+ "#{record.siteId}, #{record.pollutantId}, #{record.datetime}, #{record.year}, #{record.month}, #{record.day}, "
+ "#{record.fine, typeHandler=org.apache.ibatis.type.BooleanTypeHandler}, #{record.dayOfWeek}, "
+ "#{record.fine, typeHandler=org.apache.ibatis.type.BooleanTypeHandler}, #{record.cartridge}, #{record.media}, #{record.dayOfWeek}, "
+ "#{record.mass}, #{record.volume}, #{record.tsp}, #{record.duration}, #{record.data}"
+ "</foreach>"
+ " ON CONFLICT DO NOTHING;",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
public class IntegratedDataRecord extends DataRecord {

private Boolean fine;
private String cartridge;
private String media;
private BigDecimal mass;
private BigDecimal volume;
private BigDecimal tsp;
Expand Down Expand Up @@ -52,5 +54,21 @@ public Double getDuration() {

public void setDuration(Double duration) {
this.duration = duration;
}

public String getCartridge() {
return cartridge;
}

public void setCartridge(String cartridge) {
this.cartridge = cartridge;
}

public String getMedia() {
return media;
}

public void setMedia(String media) {
this.media = media;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.dbf.naps.data.loader.NAPSDataLoader;
import com.dbf.naps.data.loader.integrated.runner.CFFileLoadRunner;
import com.dbf.naps.data.loader.integrated.runner.SampleMetaDataFileLoadRunner;
import com.dbf.naps.data.loader.integrated.runner.SpeciationFileLoadRunner;

public class NAPSIntegratedDataLoader extends NAPSDataLoader {

Expand All @@ -26,12 +27,37 @@ protected List<Class<?>> getDBMappers() {
protected Runnable processFile(File dataFile) {

String fileName = dataFile.getName().toUpperCase();
if(fileName.endsWith("_DICH.XLS") || fileName.endsWith("_PART25.XLS")) {
return new CFFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile);
} else if(fileName.endsWith("_PAH.XLS") || fileName.endsWith("_HCB.XLS") || fileName.endsWith("_VOC.XLS")
|| fileName.endsWith("_PCDD.XLS") || fileName.endsWith("_PCB.XLS")) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile);
}
if(fileName.endsWith("_DICH.XLS") ) {
return new CFFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "DICHOT");
} else if(fileName.endsWith("_PART25.XLS")) {
return new CFFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "PART25");
} else if(fileName.endsWith("_PAH.XLS")) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "PAH");
} else if(fileName.endsWith("_HCB.XLS") ) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "HCB");
} else if(fileName.endsWith("_VOC.XLS")) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "VOC");
} else if(fileName.endsWith("_PCDD.XLS") ) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "PCDD");
} else if( fileName.endsWith("_PCB.XLS")) {
return new SampleMetaDataFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "PCB");
} else if( fileName.endsWith("_CARB.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "CARB");
} else if( fileName.endsWith("_IC.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "IC");
} else if( fileName.endsWith("_ICPMS.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "ICPMS");
} else if( fileName.endsWith("_NA.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "NA");
} else if( fileName.endsWith("_NH4.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "NH4");
} else if( fileName.endsWith("_SPEC.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "SPEC");
} else if( fileName.endsWith("_WICPMS.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "WICPMS");
} else if( fileName.endsWith("_LEV.XLS")) {
return new SpeciationFileLoadRunner(getThreadID(), getOptions(), getSqlSessionFactory(), dataFile, "LEV");
}
throw new IllegalArgumentException("Unsupported data file: " + dataFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/
public class CFFileLoadRunner extends IntegratedFileLoadRunner {

public CFFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile) {
super(threadId, config, sqlSessionFactory, rawFile);
public CFFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile, method);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,34 @@ public class IntegratedFileLoadRunner extends FileLoadRunner {
DEFAULT_IGNORED_HEADERS.add("SURROGATE"); //Surrogate Recovery
DEFAULT_IGNORED_HEADERS.add("48 H"); //Not sure why this is a column
DEFAULT_IGNORED_HEADERS.add("CANISTER"); //Canister ID#
DEFAULT_IGNORED_HEADERS.add("CART"); //Cart, Cartridge
DEFAULT_IGNORED_HEADERS.add("START"); //Start Time
DEFAULT_IGNORED_HEADERS.add("END"); //End Time
DEFAULT_IGNORED_HEADERS.add("DURATION"); //Duration
DEFAULT_IGNORED_HEADERS.add("SUM"); //Sum PCB TEQ*
DEFAULT_IGNORED_HEADERS.add("FIELD"); //Field ID
}

public IntegratedFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile) {
super(threadId, config, sqlSessionFactory, rawFile);
DEFAULT_IGNORED_HEADERS.add("SPECIATION"); //Speciation Mass (ug/m3)
DEFAULT_IGNORED_HEADERS.add("MEDIA"); //Media
DEFAULT_IGNORED_HEADERS.add("FRACTION"); //Fraction
DEFAULT_IGNORED_HEADERS.add("DICH"); //Dich/Partisol Mass (ug/m3)
DEFAULT_IGNORED_HEADERS.add("PRESS"); //PRESS
DEFAULT_IGNORED_HEADERS.add("TEMP"); //TEMP
DEFAULT_IGNORED_HEADERS.add("WS"); //WS
DEFAULT_IGNORED_HEADERS.add("HUM"); //HUM
DEFAULT_IGNORED_HEADERS.add("TDP"); //TDP
DEFAULT_IGNORED_HEADERS.add("WD"); //WD
}

private ExcelSheet sheet;
private Integer siteID; //Track the site id to read it only once
private Integer headerRowNumber; //Track when we have reached the real header row
private Integer lastColumn; //Track when we have reached the NAPS ID which represents the last column
private final String method;

public IntegratedFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile);
this.method = "INT_" + method;
}

/**
* Main entry-point method for processing the sheet.
Expand Down Expand Up @@ -186,6 +200,9 @@ protected List<IntegratedDataRecord> processRow(int row, Date date) {
*/
private boolean isColumnIgnored(String columnHeader) {
columnHeader = columnHeader.toUpperCase();
//ID is a special case that uses an exact match otherwise we might match
//a compound that either start or end with the letter "id"
if (columnHeader.equals("ID")) return true;
for(String ignoredHeader : getIgnoredColumnList()) {
if (columnHeader.startsWith(ignoredHeader) || columnHeader.endsWith(ignoredHeader)) return true;
}
Expand Down Expand Up @@ -226,7 +243,7 @@ protected IntegratedDataRecord processSingleRecord(String columnHeader, String c
IntegratedDataRecord record = new IntegratedDataRecord();
record.setDatetime(date);
record.setSiteId(siteID);
record.setPollutantId(getPollutantID(columnHeader));
record.setPollutantId(getPollutantID(columnHeader, method));

//Treat less-than as zeros (below detection limit)
if (cellValue.startsWith("<") || "N.D.".equals(cellValue)) {
Expand All @@ -243,6 +260,8 @@ protected IntegratedDataRecord processSingleRecord(String columnHeader, String c
return record;
}



/**
* Inserts or updates the provided IntegratedDataRecord records into the database.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
public class SampleMetaDataFileLoadRunner extends IntegratedFileLoadRunner {

public SampleMetaDataFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile) {
super(threadId, config, sqlSessionFactory, rawFile);
public SampleMetaDataFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile, method);
}

//Store these column indexes so we only have to look them up once for the entire sheet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.dbf.naps.data.loader.integrated.runner;

import java.io.File;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import org.apache.ibatis.session.SqlSessionFactory;
import com.dbf.naps.data.loader.LoaderOptions;
import com.dbf.naps.data.loader.integrated.IntegratedDataRecord;
import com.dbf.naps.data.utilities.DataCleaner;

/**
* Extends the base IntegratedFileLoadRunner class to add support for speciation metadata.
*/
public class SpeciationFileLoadRunner extends IntegratedFileLoadRunner {

public SpeciationFileLoadRunner(int threadId, LoaderOptions config, SqlSessionFactory sqlSessionFactory, File rawFile, String method) {
super(threadId, config, sqlSessionFactory, rawFile, method);
}

//Store these column indexes so we only have to look them up once for the entire sheet
private Integer massCol;
private Integer speciationMassCol;
private Integer startTimeCol;
private Integer endTimeCol;
private Integer cartridgeCol;
private Integer mediaCol;

@Override
protected void preProcessRow() {
//The column indexes for the speciation metadata are different for every sheet.
//Look them up only once and store the result.
//These columns are not guaranteed to be present in every sheet.
//getColumnIndex() will not throw an exception if the column doesn't exist.
if (null == massCol) massCol = getColumnIndex("Mass");
if (null == speciationMassCol) speciationMassCol = getColumnIndex("Speciation Mass");
if (null == startTimeCol) startTimeCol = getColumnIndex("Start"); //Some are "Start_time", some are "Start Time"
if (null == endTimeCol) endTimeCol = getColumnIndex("End");
if (null == cartridgeCol) cartridgeCol = getColumnIndex("Cart"); //Some are "Cart", some are "Cartridge"
if (null == mediaCol) mediaCol = getColumnIndex("Media");
}

@Override
protected List<IntegratedDataRecord> processRow(int row, Date date) {

BigDecimal mass = DataCleaner.extractDecimalData(getSheet().getCellContents(massCol, row), true);

//TODO: This is separate from sample mass and should be its own column
BigDecimal speciationMass = DataCleaner.extractDecimalData(getSheet().getCellContents(speciationMassCol, row), true);

//TODO: Handle the "Dich/Partisol Mass (ug/m3)" column?

Double startTime = DataCleaner.extractDoubleData(getSheet().getCellContents(startTimeCol, row), true);
Double endTime = DataCleaner.extractDoubleData(getSheet().getCellContents(endTimeCol, row), true);
Double duration = (startTime != null && endTime != null) ? (endTime - startTime) : null;

String cartridge = getSheet().getCellContents(cartridgeCol, row);
if("".equals(cartridge)) cartridge = null;

String media = getSheet().getCellContents(mediaCol, row);
if("".equals(media)) media = null;

List<IntegratedDataRecord> records = super.processRow(row, date);
for(IntegratedDataRecord record : records) {
//Enhance the data with metadata specific to this dataset
record.setDuration(duration);
record.setMedia(media);
record.setCartridge(cartridge);
record.setMass(mass);
}
return records;
}
}
6 changes: 4 additions & 2 deletions src/main/java/com/dbf/naps/data/utilities/DataCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ private static double convertDurationToHours(String duration) {
// Split the duration string into hours, minutes, and seconds
String[] parts = duration.split(":");

if (parts.length != 3) throw new NumberFormatException("Unparsable duration string: " + duration);
if (parts.length > 3 || parts.length < 2)
throw new NumberFormatException("Unparsable duration string: " + duration);

// Parse the parts into integers
int hours = Integer.parseInt(parts[0]);
int minutes = Integer.parseInt(parts[1]);
int seconds = Integer.parseInt(parts[2]);
//Some durations are in the form of "24:00" instead of "24:00:00"
int seconds = parts.length == 3 ? Integer.parseInt(parts[2]) : 0;

// Convert minutes and seconds to hours
double minutesToHours = minutes / 60.0;
Expand Down
11 changes: 6 additions & 5 deletions src/main/resources/schema/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ CREATE TABLE IF NOT EXISTS naps.pollutants
(
id SERIAL NOT NULL,
name VARCHAR(255) NOT NULL,
method VARCHAR(255) NOT NULL,
PRIMARY KEY (id)
);

CREATE UNIQUE INDEX IF NOT EXISTS idx_pollutants_name ON naps.pollutants (name ASC);

-- Insert the basic pollutants
INSERT into naps.pollutants (name) values ('CO'), ('NO'), ('NO2'), ('NOX'), ('O3'), ('PM10'), ('PM25'), ('SO2') ON CONFLICT DO NOTHING;
CREATE UNIQUE INDEX IF NOT EXISTS idx_pollutants_name ON naps.pollutants (name, method ASC);
CREATE INDEX IF NOT EXISTS idx_pollutants_method ON naps.pollutants (method ASC);

CREATE TABLE IF NOT EXISTS naps.sites
(
Expand Down Expand Up @@ -81,6 +80,8 @@ CREATE TABLE IF NOT EXISTS naps.integrated_data
day smallint not null,
day_of_week smallint not null,
fine boolean null,
cartridge VARCHAR(2) null,
media VARCHAR(2) null,
sample_mass NUMERIC(12,6) NULL,
sample_vol NUMERIC(12,6) NULL,
sample_duration double precision NULL,
Expand All @@ -96,7 +97,7 @@ CREATE TABLE IF NOT EXISTS naps.integrated_data
ON DELETE CASCADE
);

CREATE UNIQUE INDEX IF NOT EXISTS idx_integrated_data_pk ON naps.integrated_data (site_id, pollutant_id, date_time, fine);
CREATE UNIQUE INDEX IF NOT EXISTS idx_integrated_data_pk ON naps.integrated_data (site_id, pollutant_id, date_time, fine, cartridge, media);
CREATE INDEX IF NOT EXISTS idx_integrated_data_pollutant_id ON naps.integrated_data (pollutant_id ASC);
CREATE INDEX IF NOT EXISTS idx_integrated_data_date_time ON naps.integrated_data (date_time ASC);
CREATE INDEX IF NOT EXISTS idx_integrated_data_year ON naps.integrated_data (year ASC);
Expand Down
Binary file modified target/naps_data.jar
Binary file not shown.

0 comments on commit 0761070

Please sign in to comment.