Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Apr 16, 2024
1 parent 7b86711 commit 7d5bc5a
Show file tree
Hide file tree
Showing 20 changed files with 379 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public long getEndWritingOffset() {
return endWritingOffset;
}

public PollTsFileMessagePayload() {}

public PollTsFileMessagePayload(String topicName, String fileName, long endWritingOffset) {
this.topicName = topicName;
this.fileName = fileName;
this.endWritingOffset = endWritingOffset;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(topicName, stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public static SubscriptionPolledMessage deserialize(final ByteBuffer buffer) {
case TS_FILE_INFO:
messagePayload = new TsFileInfoMessagePayload().deserialize(buffer);
break;
case TS_FILE_PIECE:
messagePayload = new TsFilePieceMessagePayload().deserialize(buffer);
break;
case TS_FILE_SEAL:
messagePayload = new TsFileSealMessagePayload().deserialize(buffer);
break;
default:
messagePayload = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

public class TsFileInfoMessagePayload implements SubscriptionMessagePayload {

protected transient String fileName;
private transient String fileName;

public String getFileName() {
return fileName;
}

public TsFileInfoMessagePayload() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rpc.subscription.payload.common;

import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class TsFilePieceMessagePayload implements SubscriptionMessagePayload {

private transient String fileName;

private transient long endWritingOffset;

private transient byte[] filePiece;

public TsFilePieceMessagePayload() {}

public TsFilePieceMessagePayload(String fileName, long endWritingOffset, byte[] filePiece) {
this.fileName = fileName;
this.endWritingOffset = endWritingOffset;
this.filePiece = filePiece;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(fileName, stream);
ReadWriteIOUtils.write(endWritingOffset, stream);
ReadWriteIOUtils.write(new Binary(filePiece), stream);
}

@Override
public SubscriptionMessagePayload deserialize(ByteBuffer buffer) {
this.fileName = ReadWriteIOUtils.readString(buffer);
this.endWritingOffset = ReadWriteIOUtils.readLong(buffer);
final int size = ReadWriteIOUtils.readInt(buffer);
this.filePiece = ReadWriteIOUtils.readBytes(buffer, size);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rpc.subscription.payload.common;

import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class TsFileSealMessagePayload implements SubscriptionMessagePayload {

private transient String fileName;

private transient long fileLength;

public TsFileSealMessagePayload() {}

public TsFileSealMessagePayload(String fileName, long fileLength) {
this.fileName = fileName;
this.fileLength = fileLength;
}

@Override
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(fileName, stream);
ReadWriteIOUtils.write(fileLength, stream);
}

@Override
public SubscriptionMessagePayload deserialize(ByteBuffer buffer) {
this.fileName = ReadWriteIOUtils.readString(buffer);
this.fileLength = ReadWriteIOUtils.readLong(buffer);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -67,8 +66,8 @@ public abstract class SubscriptionConsumer implements AutoCloseable {
private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;

private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders =
new ConcurrentSkipListMap<>();
private final Map<Integer, SubscriptionProvider> subscriptionProviders =
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock subscriptionProvidersLock = new ReentrantReadWriteLock(true);

private ScheduledExecutorService heartbeatWorkerExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class SubscriptionMessage implements Comparable<SubscriptionMessage> {

private final SubscriptionCommitContext commitContext;

// TODO: support more data format
private final SubscriptionMessagePayload payload;

public SubscriptionMessage(SubscriptionCommitContext commitContext, List<Tablet> tablets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@

package org.apache.iotdb.session.subscription;

public interface SubscriptionMessagePayload extends AutoCloseable {}
public interface SubscriptionMessagePayload extends AutoCloseable {

void open() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,33 @@
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPolledMessage;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPolledMessageType;
import org.apache.iotdb.rpc.subscription.payload.common.TabletsMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.TsFileInfoMessagePayload;

public class SubscriptionRawMessageParser {
public class SubscriptionPolledMessageParser {

private SubscriptionRawMessageParser() {}
private SubscriptionPolledMessageParser() {}

public static SubscriptionMessage parse(SubscriptionPolledMessage rawMessage) {
short messageType = rawMessage.getMessageType();
public static SubscriptionMessage parse(
SubscriptionPullConsumer consumer, SubscriptionPolledMessage polledMessage) {
short messageType = polledMessage.getMessageType();
if (SubscriptionPolledMessageType.isValidatedMessageType(messageType)) {
switch (SubscriptionPolledMessageType.valueOf(messageType)) {
case TABLETS:
return new SubscriptionMessage(
rawMessage.getCommitContext(),
((TabletsMessagePayload) rawMessage.getMessagePayload()).getTablets());
polledMessage.getCommitContext(),
((TabletsMessagePayload) polledMessage.getMessagePayload()).getTablets());
case TS_FILE_INFO:
// TODO
try {
consumer.pollTsFile(
polledMessage.getCommitContext().getDataNodeId(),
polledMessage.getCommitContext().getTopicName(),
((TsFileInfoMessagePayload) polledMessage.getMessagePayload()).getFileName(),
0,
0L);
} catch (Exception e) {

}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.common.PollMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.PollTsFileMessagePayload;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPollMessage;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPollMessageType;
Expand Down Expand Up @@ -151,13 +152,13 @@ public List<SubscriptionMessage> poll(Set<String> topicNames, Duration timeoutMs

public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
throws TException, IOException, StatementExecutionException {
List<SubscriptionPolledMessage> rawMessages = new ArrayList<>();
List<SubscriptionPolledMessage> polledMessages = new ArrayList<>();

acquireReadLock();
try {
for (final SubscriptionProvider provider : getAllAvailableProviders()) {
// TODO: network timeout
rawMessages.addAll(
polledMessages.addAll(
provider
.getSessionConnection()
.poll(
Expand All @@ -171,7 +172,9 @@ public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
}

List<SubscriptionMessage> messages =
rawMessages.stream().map(SubscriptionRawMessageParser::parse).collect(Collectors.toList());
polledMessages.stream()
.map(polledMessage -> SubscriptionPolledMessageParser.parse(this, polledMessage))
.collect(Collectors.toList());

if (autoCommit) {
long currentTimestamp = System.currentTimeMillis();
Expand All @@ -187,6 +190,30 @@ public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
return messages;
}

List<SubscriptionPolledMessage> pollTsFile(
int dataNodeId, String topicName, String fileName, long endWritingOffset, long timeoutMs)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
acquireReadLock();
try {
final SubscriptionProvider provider = getProvider(dataNodeId);
if (Objects.isNull(provider) || !provider.isAvailable()) {
throw new IoTDBConnectionException(
String.format(
"something unexpected happened when poll tsfile from subscription provider with data node id %s, the subscription provider may be unavailable or not existed",
dataNodeId));
}
return provider
.getSessionConnection()
.poll(
new SubscriptionPollMessage(
SubscriptionPollMessageType.POLL_TS_FILE.getType(),
new PollTsFileMessagePayload(topicName, fileName, endWritingOffset),
timeoutMs));
} finally {
releaseReadLock();
}
}

public void commitSync(SubscriptionMessage message)
throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
commitSync(Collections.singletonList(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,23 @@ public class SubscriptionSessionDataSets
public SubscriptionSessionDataSets(List<Tablet> tablets) {
this.dataSetList = new ArrayList<>();
this.tablets = tablets;
tablets.forEach((tablet -> this.dataSetList.add(new SubscriptionSessionDataSet(tablet))));
}

@Override
public Iterator<SubscriptionSessionDataSet> iterator() {
tablets.forEach((tablet -> this.dataSetList.add(new SubscriptionSessionDataSet(tablet))));
return dataSetList.iterator();
}

public Iterator<Tablet> tabletIterator() {
return tablets.iterator();
}

@Override
public void open() {
// do nothing
}

@Override
public void close() throws Exception {
for (SubscriptionSessionDataSet dataSet : dataSetList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@

package org.apache.iotdb.session.subscription;

import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.common.SubscriptionPolledMessage;
import org.apache.iotdb.tsfile.read.TsFileReader;

import org.apache.thrift.TException;

import java.io.IOException;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;

public class SubscriptionTsFileReader implements SubscriptionMessagePayload {

Expand All @@ -38,9 +32,9 @@ public SubscriptionTsFileReader(String fileName) {
this.fileName = fileName;
}

public void open(SubscriptionPullConsumer consumer, SubscriptionCommitContext commitContext)
throws TException, IOException, StatementExecutionException {
SubscriptionPolledMessage rawMessage;
@Override
public void open() throws Exception {
reader = new TsFileReader(new TsFileSequenceReader(fileName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public List<SubscriptionEvent> poll(
return broker.poll(topicNames, timer);
}

public List<SubscriptionEvent> pollTsFile(
ConsumerConfig consumerConfig, String topicName, String fileName, long endWritingOffset) {
final String consumerGroupId = consumerConfig.getConsumerGroupId();
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId);
return Collections.emptyList();
}
return broker.pollTsFile(topicName, fileName, endWritingOffset);
}

public void commit(
final ConsumerConfig consumerConfig, final List<SubscriptionCommitContext> commitContexts) {
final String consumerGroupId = consumerConfig.getConsumerGroupId();
Expand Down
Loading

0 comments on commit 7d5bc5a

Please sign in to comment.