Skip to content

Commit

Permalink
Merge branch '1.4.1-develop' into rel/1.4.1_arenadata1
Browse files Browse the repository at this point in the history
  • Loading branch information
Asmoday committed Oct 22, 2024
2 parents ae96397 + ac2e783 commit 3cc2a08
Show file tree
Hide file tree
Showing 85 changed files with 3,011 additions and 878 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Ozone
Copyright 2022 The Apache Software Foundation
Copyright 2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
98 changes: 98 additions & 0 deletions dev-support/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ozone-main</artifactId>
<groupId>org.apache.ozone</groupId>
<version>1.4.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ozone-dev-support</artifactId>
<description>Helper module for sharing resources among projects</description>
<name>Apache Ozone Dev Support</name>

<properties>
<failIfNoTests>false</failIfNoTests>
</properties>
<build>
<resources>
<resource>
<directory>${project.build.directory}/extra-resources</directory>
<targetPath>META-INF</targetPath>
<includes>
<include>LICENSE.txt</include>
<include>NOTICE.txt</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- copy L&N files to target/extra-resources -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/extra-resources</outputDirectory>
<resources>
<resource>
<directory>../</directory>
<includes>
<include>LICENSE.txt</include>
<include>NOTICE.txt</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- add entries for L&N files to remote-resources.xml in jar file -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>
<executions>
<execution>
<phase>process-resources</phase>
<goals>
<goal>bundle</goal>
</goals>
</execution>
</executions>
<configuration>
<resourcesDirectory>${project.build.outputDirectory}</resourcesDirectory>
<includes>
<include>META-INF/LICENSE.txt</include>
<include>META-INF/NOTICE.txt</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -141,8 +145,34 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
}

if (checksumBlockData != null) {
List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

// For the same BlockGroupLength, we need to find the larger value of Block DataSize.
// This is because we do not send empty chunks to the DataNode, so the larger value is more accurate.
Map<Long, Optional<BlockData>> maxDataSizeByGroup = Arrays.stream(blockData)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(BlockData::getBlockGroupLength,
Collectors.maxBy(Comparator.comparingLong(BlockData::getSize))));
BlockData maxBlockData = maxDataSizeByGroup.get(blockGroupLength).get();

// When calculating the checksum size,
// We need to consider both blockGroupLength and the actual size of blockData.
//
// We use the smaller value to determine the size of the ChunkList.
//
// 1. In most cases, blockGroupLength is equal to the size of blockData.
// 2. Occasionally, blockData is not fully filled; if a chunk is empty,
// it is not sent to the DN, resulting in blockData size being smaller than blockGroupLength.
// 3. In cases with 'dirty data',
// if an error occurs when writing to the EC-Stripe (e.g., DN reports Container Closed),
// and the length confirmed with OM is smaller, blockGroupLength may be smaller than blockData size.
long blockDataSize = Math.min(maxBlockData.getSize(), blockGroupLength);
int chunkSize = (int) Math.ceil(((double) blockDataSize / repConfig.getEcChunkSize()));
List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
if (chunkSize > 0) {
checksumBlockDataChunks = checksumBlockData.getChunks().subList(0, chunkSize);
}

List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();

Preconditions.checkArgument(
currentChunks.size() == checksumBlockDataChunks.size(),
Expand Down Expand Up @@ -268,7 +298,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
throw ce;
});
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
throw new IOException(EXCEPTION_MSG + e, e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,4 +902,17 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
? Thread.currentThread().getStackTrace()
: null;
}

/**
* Logs a warning to report that the class is not closed properly.
*/
public static void reportLeak(Class<?> clazz, String stackTrace, Logger log) {
String warning = String.format("%s is not closed properly", clazz.getSimpleName());
if (stackTrace != null && LOG.isDebugEnabled()) {
String debugMessage = String.format("%nStackTrace for unclosed instance: %s",
stackTrace);
warning = warning.concat(debugMessage);
}
log.warn(warning);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.conf;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;

/**
* Utilities for Ratis configurations.
*/
public class RatisConfUtils {
/** For {@link GrpcConfigKeys}. */
public static class Grpc {
/** For setting {@link GrpcConfigKeys#setMessageSizeMax(RaftProperties, SizeInBytes)}. */
public static void setMessageSizeMax(RaftProperties properties, int max) {
Preconditions.assertTrue(max > 0, () -> "max = " + max + " <= 0");

final long logAppenderBufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSize();
Preconditions.assertTrue(max >= logAppenderBufferByteLimit,
() -> "max = " + max + " < logAppenderBufferByteLimit = " + logAppenderBufferByteLimit);

// Need an 1MB gap; see RATIS-2135
GrpcConfigKeys.setMessageSizeMax(properties, SizeInBytes.valueOf(max + SizeInBytes.ONE_MB.getSize()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
import org.apache.hadoop.ozone.OzoneConsts;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -296,4 +297,14 @@ public void appendTo(StringBuilder sb) {
sb.append(", size=").append(size);
sb.append("]");
}

public long getBlockGroupLength() {
String lenStr = getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
// If we don't have the length, then it indicates a problem with the stripe.
// All replica should carry the length, so if it is not there, we return 0,
// which will cause us to set the length of the block to zero and not
// attempt to reconstruct it.
return (lenStr == null) ? 0 : Long.parseLong(lenStr);
}
}
11 changes: 10 additions & 1 deletion hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2097,7 +2097,7 @@
</property>

<property>
<name>ozone.om.ratis.server.leaderelection.pre-vote </name>
<name>ozone.om.ratis.server.leaderelection.pre-vote</name>
<value>true</value>
<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
<description>Enable/disable OM HA leader election pre-vote phase.
Expand All @@ -2114,6 +2114,15 @@
</description>
</property>

<property>
<name>ozone.om.ratis.server.close.threshold</name>
<value>60s</value>
<tag>OZONE, OM, RATIS</tag>
<description>
Raft Server will close if JVM pause longer than the threshold.
</description>
</property>

<property>
<name>ozone.om.ratis.snapshot.dir</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.conf;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test {@link RatisConfUtils}.
*/
public class TestRatisConfUtils {
private static final Logger LOG = LoggerFactory.getLogger(TestRatisConfUtils.class);

@Test
void testGrpcSetMessageSizeMax() {
final RaftProperties properties = new RaftProperties();

final int logAppenderBufferByteLimit = 1000;

// setMessageSizeMax without setBufferByteLimit
Assertions.assertThrows(IllegalStateException.class,
() -> RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit));

RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, SizeInBytes.valueOf(logAppenderBufferByteLimit));

// setMessageSizeMax with a value smaller than logAppenderBufferByteLimit
Assertions.assertThrows(IllegalStateException.class,
() -> RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit - 1));

// setMessageSizeMax with the correct logAppenderBufferByteLimit
RatisConfUtils.Grpc.setMessageSizeMax(properties, logAppenderBufferByteLimit);

final SizeInBytes max = GrpcConfigKeys.messageSizeMax(properties, LOG::info);
Assertions.assertEquals(SizeInBytes.ONE_MB.getSize(), max.getSize() - logAppenderBufferByteLimit);
}
}
Loading

0 comments on commit 3cc2a08

Please sign in to comment.