Skip to content
Open
21 changes: 9 additions & 12 deletions src/main/java/com/oath/halodb/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;

class CompactionManager {
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);

Expand Down Expand Up @@ -266,11 +266,8 @@ private void copyFreshRecordsToNewFile(int idOfFileToCompact) throws IOException
);
currentWriteFile.getIndexFile().write(newEntry);

int valueOffset = Utils.getValueOffset(currentWriteFileOffset, key);
InMemoryIndexMetaData newMetaData = new InMemoryIndexMetaData(
currentWriteFile.getFileId(), valueOffset,
currentRecordMetaData.getValueSize(), indexFileEntry.getSequenceNumber()
);
InMemoryIndexMetaData newMetaData = currentRecordMetaData
.relocated(currentWriteFile.getFileId(), currentWriteFileOffset);

boolean updated = dbInternal.getInMemoryIndex().replace(key, currentRecordMetaData, newMetaData);
if (updated) {
Expand Down Expand Up @@ -302,7 +299,7 @@ private void copyFreshRecordsToNewFile(int idOfFileToCompact) throws IOException
private boolean isRecordFresh(IndexFileEntry entry, InMemoryIndexMetaData metaData, int idOfFileToMerge) {
return metaData != null
&& metaData.getFileId() == idOfFileToMerge
&& metaData.getValueOffset() == Utils.getValueOffset(entry.getRecordOffset(), entry.getKey());
&& metaData.getValueOffset() == RecordEntry.getValueOffset(entry.getRecordOffset(), entry.getKey().length);
}

private void rollOverCurrentWriteFile(int recordSize) throws IOException {
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/com/oath/halodb/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.spi.FileSystemProvider;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/oath/halodb/HaloDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;

import java.io.File;
import java.io.IOException;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;

public final class HaloDB {

Expand Down
70 changes: 35 additions & 35 deletions src/main/java/com/oath/halodb/HaloDBFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@

package com.oath.halodb;

import com.google.common.primitives.Ints;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

import java.io.File;
import java.io.IOException;
Expand All @@ -21,8 +19,10 @@
import java.util.function.BiFunction;
import java.util.regex.Matcher;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

/**
* Represents a data file and its associated index file.
Expand All @@ -32,7 +32,8 @@ class HaloDBFile {

private volatile int writeOffset;

private FileChannel channel;
private final FileChannel channel;
private final RandomAccessFile raf;
private final File backingFile;
private final DBDirectory dbDirectory;
private final int fileId;
Expand All @@ -49,13 +50,14 @@ class HaloDBFile {
private final FileType fileType;

private HaloDBFile(int fileId, File backingFile, DBDirectory dbDirectory, IndexFile indexFile, FileType fileType,
FileChannel channel, HaloDBOptions options) throws IOException {
RandomAccessFile raf, HaloDBOptions options) throws IOException {
this.fileId = fileId;
this.backingFile = backingFile;
this.dbDirectory = dbDirectory;
this.indexFile = indexFile;
this.fileType = fileType;
this.channel = channel;
this.raf = raf;
this.channel = raf.getChannel();
this.writeOffset = Ints.checkedCast(channel.size());
this.options = options;
}
Expand All @@ -80,20 +82,22 @@ int readFromFile(long position, ByteBuffer destinationBuffer) throws IOException
return (int)(currentPosition - position);
}

private Record readRecord(int offset) throws HaloDBException, IOException {
private RecordEntry readRecord(int offset) throws HaloDBException, IOException {
long tempOffset = offset;

// read the header from disk.
ByteBuffer headerBuf = ByteBuffer.allocate(Record.Header.HEADER_SIZE);
ByteBuffer headerBuf = ByteBuffer.allocate(RecordEntry.Header.HEADER_SIZE);
int readSize = readFromFile(offset, headerBuf);
if (readSize != Record.Header.HEADER_SIZE) {
if (readSize != RecordEntry.Header.HEADER_SIZE) {
throw new HaloDBException("Corrupted header at " + offset + " in file " + fileId);
}
tempOffset += readSize;

Record.Header header = Record.Header.deserialize(headerBuf);
if (!Record.Header.verifyHeader(header)) {
throw new HaloDBException("Corrupted header at " + offset + " in file " + fileId);
RecordEntry.Header header;
try {
header = RecordEntry.Header.deserialize(headerBuf);
} catch (IllegalArgumentException e) {
throw new HaloDBException("Corrupted header at " + offset + " in file " + fileId, e);
}

// read key-value from disk.
Expand All @@ -103,29 +107,25 @@ private Record readRecord(int offset) throws HaloDBException, IOException {
throw new HaloDBException("Corrupted record at " + offset + " in file " + fileId);
}

Record record = Record.deserialize(recordBuf, header.getKeySize(), header.getValueSize());
record.setHeader(header);
int valueOffset = offset + Record.Header.HEADER_SIZE + header.getKeySize();
record.setRecordMetaData(new InMemoryIndexMetaData(fileId, valueOffset, header.getValueSize(), header.getSequenceNumber()));
RecordEntry record = RecordEntry.deserialize(header, recordBuf);
return record;
}

InMemoryIndexMetaData writeRecord(Record record) throws IOException {
InMemoryIndexMetaData writeRecord(RecordEntry record) throws IOException {
writeToChannel(record.serialize());

int recordSize = record.getRecordSize();
int recordOffset = writeOffset;
writeOffset += recordSize;

IndexFileEntry indexFileEntry = new IndexFileEntry(
record.getKey(), recordSize,
record.getKey(), recordSize,
recordOffset, record.getSequenceNumber(),
Versions.CURRENT_INDEX_FILE_VERSION, -1
);
indexFile.write(indexFileEntry);

int valueOffset = Utils.getValueOffset(recordOffset, record.getKey());
return new InMemoryIndexMetaData(fileId, valueOffset, record.getValue().length, record.getSequenceNumber());
return new InMemoryIndexMetaData(indexFileEntry, fileId);
}

void rebuildIndexFile() throws IOException {
Expand All @@ -137,7 +137,7 @@ void rebuildIndexFile() throws IOException {
HaloDBFileIterator iterator = new HaloDBFileIterator();
int offset = 0;
while (iterator.hasNext()) {
Record record = iterator.next();
RecordEntry record = iterator.next();
IndexFileEntry indexFileEntry = new IndexFileEntry(
record.getKey(), record.getRecordSize(),
offset, record.getSequenceNumber(),
Expand All @@ -162,7 +162,7 @@ HaloDBFile repairFile(DBDirectory dbDirectory) throws IOException {
HaloDBFileIterator iterator = new HaloDBFileIterator();
int count = 0;
while (iterator.hasNext()) {
Record record = iterator.next();
RecordEntry record = iterator.next();
// if the header is corrupted iterator will return null.
if (record != null && record.verifyChecksum()) {
repairFile.writeRecord(record);
Expand Down Expand Up @@ -191,10 +191,10 @@ private HaloDBFile createRepairFile() throws IOException {
repairFile.delete();
}

FileChannel channel = new RandomAccessFile(repairFile, "rw").getChannel();
RandomAccessFile raf = new RandomAccessFile(repairFile, "rw");
IndexFile indexFile = new IndexFile(fileId, dbDirectory, options);
indexFile.createRepairFile();
return new HaloDBFile(fileId, repairFile, dbDirectory, indexFile, fileType, channel, options);
return new HaloDBFile(fileId, repairFile, dbDirectory, indexFile, fileType, raf, options);
}

private long writeToChannel(ByteBuffer[] buffers) throws IOException {
Expand Down Expand Up @@ -252,11 +252,11 @@ int getFileId() {

static HaloDBFile openForReading(DBDirectory dbDirectory, File filename, FileType fileType, HaloDBOptions options) throws IOException {
int fileId = HaloDBFile.getFileTimeStamp(filename);
FileChannel channel = new RandomAccessFile(filename, "r").getChannel();
RandomAccessFile raf = new RandomAccessFile(filename, "r");
IndexFile indexFile = new IndexFile(fileId, dbDirectory, options);
indexFile.open();

return new HaloDBFile(fileId, filename, dbDirectory, indexFile, fileType, channel, options);
return new HaloDBFile(fileId, filename, dbDirectory, indexFile, fileType, raf, options);
}

static HaloDBFile create(DBDirectory dbDirectory, int fileId, HaloDBOptions options, FileType fileType) throws IOException {
Expand All @@ -269,23 +269,23 @@ static HaloDBFile create(DBDirectory dbDirectory, int fileId, HaloDBOptions opti
file = toFile.apply(dbDirectory, fileId);
}

FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
RandomAccessFile raf = new RandomAccessFile(file, "rw");
//TODO: setting the length might improve performance.
//file.setLength(max_);

IndexFile indexFile = new IndexFile(fileId, dbDirectory, options);
indexFile.create();

return new HaloDBFile(fileId, file, dbDirectory, indexFile, fileType, channel, options);
return new HaloDBFile(fileId, file, dbDirectory, indexFile, fileType, raf, options);
}

HaloDBFileIterator newIterator() throws IOException {
return new HaloDBFileIterator();
}

void close() throws IOException {
if (channel != null) {
channel.close();
if (raf != null) {
raf.close();
}
if (indexFile != null) {
indexFile.close();
Expand Down Expand Up @@ -349,8 +349,8 @@ public boolean hasNext() {
}

@Override
public Record next() {
Record record;
public RecordEntry next() {
RecordEntry record;
try {
record = readRecord(currentOffset);
} catch (IOException | HaloDBException e) {
Expand Down
Loading