Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Apr 16, 2024
1 parent 7d5bc5a commit 1cf8d99
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public class TsFilePieceMessagePayload implements SubscriptionMessagePayload {

private transient byte[] filePiece;

public String getFileName() {
return fileName;
}

public long getEndWritingOffset() {
return endWritingOffset;
}

public byte[] getFilePiece() {
return filePiece;
}

public TsFilePieceMessagePayload() {}

public TsFilePieceMessagePayload(String fileName, long endWritingOffset, byte[] filePiece) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ public class TsFileSealMessagePayload implements SubscriptionMessagePayload {

private transient long fileLength;

public String getFileName() {
return fileName;
}

public long getFileLength() {
return fileLength;
}

public TsFileSealMessagePayload() {}

public TsFileSealMessagePayload(String fileName, long fileLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionCommitContext;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.iotdb.tsfile.utils.Pair;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -60,8 +66,8 @@ public abstract class SubscriptionConsumer implements AutoCloseable {
private final String username;
private final String password;

private final String consumerId;
private final String consumerGroupId;
protected final String consumerId;
protected final String consumerGroupId;

private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;
Expand All @@ -83,6 +89,24 @@ public String getConsumerGroupId() {
return consumerGroupId;
}

/////////////////////////////// tsfile dir ///////////////////////////////

protected Path subscribedTsFileBaseDirPath;

protected final Map<SubscriptionCommitContext, Pair<File, RandomAccessFile>>
commitContextToTsFile = new ConcurrentHashMap<>();

public Path getTsFileDir(String topicName) throws IOException {
if (Objects.isNull(subscribedTsFileBaseDirPath)) {
subscribedTsFileBaseDirPath =
Files.createTempDirectory(
String.format("subscribedTsFile_%s_%s", consumerId, consumerGroupId));
}
Path dirPath = subscribedTsFileBaseDirPath.resolve(topicName);
Files.createDirectories(dirPath);
return dirPath;
}

/////////////////////////////// ctor ///////////////////////////////

protected SubscriptionConsumer(Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,4 @@

package org.apache.iotdb.session.subscription;

public interface SubscriptionMessagePayload extends AutoCloseable {

void open() throws Exception;
}
public interface SubscriptionMessagePayload {}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,22 @@
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPollMessage;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPollMessageType;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPolledMessage;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPolledMessageType;
import org.apache.iotdb.rpc.subscription.payload.common.TabletsMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.TsFileInfoMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.TsFilePieceMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.TsFileSealMessagePayload;
import org.apache.iotdb.tsfile.utils.Pair;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -51,7 +61,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class SubscriptionPullConsumer extends SubscriptionConsumer {

Expand All @@ -65,6 +74,11 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer {

private final AtomicBoolean isClosed = new AtomicBoolean(true);

@Override
boolean isClosed() {
return isClosed.get();
}

/////////////////////////////// ctor ///////////////////////////////

public SubscriptionPullConsumer(SubscriptionPullConsumer.Builder builder) {
Expand Down Expand Up @@ -171,10 +185,37 @@ public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
releaseReadLock();
}

List<SubscriptionMessage> messages =
polledMessages.stream()
.map(polledMessage -> SubscriptionPolledMessageParser.parse(this, polledMessage))
.collect(Collectors.toList());
final List<SubscriptionMessage> messages = new ArrayList<>();
for (final SubscriptionPolledMessage polledMessage : polledMessages) {
final short messageType = polledMessage.getMessageType();
if (SubscriptionPolledMessageType.isValidatedMessageType(messageType)) {
switch (SubscriptionPolledMessageType.valueOf(messageType)) {
case TABLETS:
messages.add(
new SubscriptionMessage(
polledMessage.getCommitContext(),
((TabletsMessagePayload) polledMessage.getMessagePayload()).getTablets()));
break;
case TS_FILE_INFO:
try {
final SubscriptionMessage message =
pollTsFile(
polledMessage.getCommitContext(),
((TsFileInfoMessagePayload) polledMessage.getMessagePayload()).getFileName(),
timeoutMs);
if (Objects.isNull(message)) {
throw new Exception("xxx");
}
messages.add(message);
} catch (Exception e) {
LOGGER.warn(e.getMessage());
}
break;
default:
break;
}
}
}

if (autoCommit) {
long currentTimestamp = System.currentTimeMillis();
Expand All @@ -190,7 +231,64 @@ public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
return messages;
}

List<SubscriptionPolledMessage> pollTsFile(
private SubscriptionMessage pollTsFile(
SubscriptionCommitContext commitContext, String fileName, long timeoutMs)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
final int dataNodeId = commitContext.getDataNodeId();
final String topicName = commitContext.getTopicName();

final Path filePath = getTsFileDir(topicName).resolve(fileName);
Files.createFile(filePath);
final File file = filePath.toFile();
final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw");
commitContextToTsFile.put(commitContext, new Pair<>(file, fileWriter));

long endWritingOffset = 0;
while (true) {
final List<SubscriptionPolledMessage> polledMessages =
pollTsFileInternal(dataNodeId, topicName, fileName, endWritingOffset, timeoutMs);
if (Objects.isNull(polledMessages) || polledMessages.size() != 1) {
return null;
}
final SubscriptionPolledMessage polledMessage = polledMessages.get(0);
final short messageType = polledMessage.getMessageType();
if (SubscriptionPolledMessageType.isValidatedMessageType(messageType)) {
switch (SubscriptionPolledMessageType.valueOf(messageType)) {
case TS_FILE_PIECE:
{
final TsFilePieceMessagePayload messagePayload =
(TsFilePieceMessagePayload) polledMessage.getMessagePayload();
if (!fileName.equals(messagePayload.getFileName())) {
return null;
}
fileWriter.write(messagePayload.getFilePiece());
fileWriter.getFD().sync();
endWritingOffset = messagePayload.getEndWritingOffset();
break;
}
case TS_FILE_SEAL:
{
final TsFileSealMessagePayload messagePayload =
(TsFileSealMessagePayload) polledMessage.getMessagePayload();
if (!fileName.equals(messagePayload.getFileName())) {
return null;
}
if (fileWriter.length() != messagePayload.getFileLength()) {
return null;
}
fileWriter.getFD().sync();
fileWriter.close();
commitContextToTsFile.remove(commitContext);
break;
}
default:
break;
}
}
}
}

private List<SubscriptionPolledMessage> pollTsFileInternal(
int dataNodeId, String topicName, String fileName, long endWritingOffset, long timeoutMs)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
acquireReadLock();
Expand Down Expand Up @@ -296,10 +394,6 @@ private void commitAllUncommittedMessages() {
}
}

boolean isClosed() {
return isClosed.get();
}

long getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,4 @@ public Iterator<SubscriptionSessionDataSet> iterator() {
public Iterator<Tablet> tabletIterator() {
return tablets.iterator();
}

@Override
public void open() {
// do nothing
}

@Override
public void close() throws Exception {
for (SubscriptionSessionDataSet dataSet : dataSetList) {
dataSet.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@
import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;

import java.io.IOException;

public class SubscriptionTsFileReader implements SubscriptionMessagePayload {

private final String fileName;

private TsFileReader reader;

public SubscriptionTsFileReader(String fileName) {
this.fileName = fileName;
}

@Override
public void open() throws Exception {
reader = new TsFileReader(new TsFileSequenceReader(fileName));
}

@Override
public void close() throws Exception {
reader.close();
public TsFileReader open() throws IOException {
return new TsFileReader(new TsFileSequenceReader(fileName));
}
}

0 comments on commit 1cf8d99

Please sign in to comment.