Skip to content

Commit

Permalink
[ADH-5273] Handle different path schemes properly in actions
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Nov 7, 2024
1 parent a7ccba3 commit dfac413
Show file tree
Hide file tree
Showing 101 changed files with 1,953 additions and 2,360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ public void init(Map<String, String> args) {

protected abstract void execute() throws Exception;

protected void preRun() throws Exception {
setStartTime();
appendLog(
String.format("Action starts at %s", Utils.getFormatedCurrentTime()));
}

public final void run() {
try {
setStartTime();
preRun();
execute();
successful = true;
} catch (Throwable t) {
Expand Down
45 changes: 3 additions & 42 deletions smart-common/src/main/java/org/smartdata/SmartConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
*/
package org.smartdata;

import java.util.HashMap;
import java.util.Map;

public class SmartConstants {

public static final String SMART_HDFS_LAST_INOTIFY_TXID =
"smart_hadoop_last_inotify_txid";
"smart_hadoop_last_inotify_txid";

public static final String SMART_CLIENT_PROTOCOL_NAME =
"org.smartdata.protocol.SmartClientProtocol";

public static final String SMART_ADMIN_PROTOCOL_NAME =
"org.smartdata.protocol.SmartAdminProtocol";
"org.smartdata.protocol.SmartClientProtocol";

public static final String SMART_CLIENT_DISABLED_ID_FILE =
"/tmp/SMART_CLIENT_DISABLED_ID_FILE";
"/tmp/SMART_CLIENT_DISABLED_ID_FILE";

public static final String NUMBER_OF_SMART_AGENT =
"number_of_smart_agent_in_agents_file";
Expand All @@ -47,39 +41,6 @@ public class SmartConstants {

public static final String AGENT_CMDLET_SERVICE_NAME = "AgentCmdletService";

public static final byte STORAGE_POLICY_UNDEF_ID = 0;
public static final String STORAGE_POLICY_UNDEF_NAME = "UNDEF";

public static final byte STORAGE_POLICY_COLD_ID = 2;
public static final String STORAGE_POLICY_COLD_NAME = "COLD";

public static final byte STORAGE_POLICY_WARM_ID = 5;
public static final String STORAGE_POLICY_WARM_NAME = "WARM";

public static final byte STORAGE_POLICY_HOT_ID = 7;
public static final String STORAGE_POLICY_HOT_NAME = "HOT";

public static final byte STORAGE_POLICY_ONE_SSD_ID = 10;
public static final String STORAGE_POLICY_ONE_SSD_NAME = "ONE_SSD";

public static final byte STORAGE_POLICY_ALL_SSD_ID = 12;
public static final String STORAGE_POLICY_ALL_SSD_NAME = "ALL_SSD";

public static final byte STORAGE_POLICY_LAZY_PERSIST_ID = 15;
public static final String STORAGE_POLICY_LAZY_PERSIST_NAME = "LAZY_PERSIST";

public static final Map<Byte, String> STORAGE_POLICY_MAP = new HashMap<>();

static {
STORAGE_POLICY_MAP.put(STORAGE_POLICY_UNDEF_ID, STORAGE_POLICY_UNDEF_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_COLD_ID, STORAGE_POLICY_COLD_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_WARM_ID, STORAGE_POLICY_WARM_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_HOT_ID, STORAGE_POLICY_HOT_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_ONE_SSD_ID, STORAGE_POLICY_ONE_SSD_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_ALL_SSD_ID, STORAGE_POLICY_ALL_SSD_NAME);
STORAGE_POLICY_MAP.put(STORAGE_POLICY_LAZY_PERSIST_ID, STORAGE_POLICY_LAZY_PERSIST_NAME);
}

public static final String SMART_FILE_CHECKSUM_XATTR_NAME = "user.checksum";

public static final String FS_HDFS_IMPL = "fs.hdfs.impl";
Expand Down
37 changes: 35 additions & 2 deletions smart-common/src/main/java/org/smartdata/utils/PathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
*/
package org.smartdata.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;
import java.util.Optional;

import static org.smartdata.utils.ConfigUtil.toRemoteClusterConfig;

public class PathUtil {
private static final String DIR_SEP = "/";
private static final String HDFS_SCHEME = "hdfs";
private static final String[] GLOBS = new String[] {
"*", "?"
};
Expand Down Expand Up @@ -66,6 +70,10 @@ public static boolean pathStartsWith(String path, String prefixToCheck) {
.startsWith(addPathSeparator(prefixToCheck));
}

public static boolean isAbsoluteRemotePath(String path) {
return isAbsoluteRemotePath(new Path(path));
}

// todo replace 'stringPath.startsWith("hdfs")' calls with this method
public static boolean isAbsoluteRemotePath(Path path) {
return Optional.ofNullable(path)
Expand All @@ -77,7 +85,32 @@ public static boolean isAbsoluteRemotePath(Path path) {
public static boolean isAbsoluteRemotePath(URI uri) {
return Optional.ofNullable(uri)
.map(URI::getScheme)
.filter(HDFS_SCHEME::equals)
.isPresent();
}

public static Optional<String> getScheme(String path) {
return Optional.ofNullable(path)
.map(URI::create)
.map(URI::getScheme);
}

public static Optional<String> getScheme(Path path) {
return Optional.ofNullable(path)
.map(Path::toUri)
.map(URI::getScheme);
}

public static FileSystem getRemoteFileSystem(String path) throws IOException {
return getRemoteFileSystem(new Path(path));
}

// todo we use default HDFS config, add mechanism to provide
// hdfs config paths for remote clusters
public static FileSystem getRemoteFileSystem(Path path) throws IOException {
return path.getFileSystem(toRemoteClusterConfig(new Configuration()));
}

public static String getRawPath(Path path) {
return path.toUri().getPath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.smartdata.action.ActionRegistry;
import org.smartdata.action.SmartAction;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.hdfs.client.CachingDfsClientProvider;
import org.smartdata.hdfs.client.DfsClientProvider;
import org.smartdata.hdfs.client.CachingLocalFileSystemProvider;
import org.smartdata.hdfs.client.LocalFileSystemProvider;
import org.smartdata.model.LaunchAction;
import org.smartdata.protocol.message.LaunchCmdlet;

Expand All @@ -40,15 +40,15 @@ public class CmdletFactory implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(CmdletFactory.class);

private final SmartContext smartContext;
private final DfsClientProvider dfsClientProvider;
private final LocalFileSystemProvider localFileSystemProvider;

public CmdletFactory(SmartContext smartContext) {
this(smartContext, new CachingDfsClientProvider(smartContext.getConf()));
this(smartContext, new CachingLocalFileSystemProvider(smartContext.getConf()));
}

public CmdletFactory(SmartContext smartContext, DfsClientProvider dfsClientProvider) {
public CmdletFactory(SmartContext smartContext, LocalFileSystemProvider localFileSystemProvider) {
this.smartContext = smartContext;
this.dfsClientProvider = dfsClientProvider;
this.localFileSystemProvider = localFileSystemProvider;
}

public Cmdlet createCmdlet(LaunchCmdlet launchCmdlet) throws ActionException {
Expand All @@ -73,15 +73,15 @@ public SmartAction createAction(long cmdletId, boolean isLastAction, LaunchActio
smartAction.init(launchAction.getArgs());
smartAction.setActionId(launchAction.getActionId());
if (smartAction instanceof HdfsAction) {
setDfsClient((HdfsAction) smartAction);
setLocalFileSystem((HdfsAction) smartAction);
}
return smartAction;
}

private void setDfsClient(HdfsAction action) throws ActionException {
private void setLocalFileSystem(HdfsAction action) throws ActionException {
try {
action.setDfsClient(
dfsClientProvider.provide(smartContext.getConf(), action.dfsClientType())
action.setLocalFileSystem(
localFileSystemProvider.provide(smartContext.getConf(), action.localFsType())
);
} catch (IOException exception) {
LOG.error("smartAction aid={} setDfsClient error", action.getActionId(), exception);
Expand All @@ -92,7 +92,7 @@ private void setDfsClient(HdfsAction action) throws ActionException {
@Override
public void close() {
try {
dfsClientProvider.close();
localFileSystemProvider.close();
} catch (IOException exception) {
String errorMessage = "Error closing DFS client provider";
log.error(errorMessage, exception);
Expand Down
4 changes: 4 additions & 0 deletions smart-hadoop-support/smart-hadoop-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<artifactId>smart-hadoop-common</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AddErasureCodingPolicy extends HdfsAction {
public static final String DATA_UNITS_NUM = "-dataNum";
public static final String PARITY_UNITS_NUM = "-parityNum";
public static final String CELL_SIZE = "-cellSize";
private String policyName;

private String codecName;
private int numDataUnits;
private int numParityUnits;
Expand All @@ -54,7 +54,7 @@ public class AddErasureCodingPolicy extends HdfsAction {
public void init(Map<String, String> args) {
super.init(args);
if (args.get(POLICY_NAME) != null && !args.get(POLICY_NAME).isEmpty()) {
this.policyName = args.get(POLICY_NAME);
String policyName = args.get(POLICY_NAME);
String[] policySchema = policyName.split("-");
if (policySchema.length != 4) {
return;
Expand All @@ -81,28 +81,33 @@ public void init(Map<String, String> args) {

@Override
public void execute() throws Exception {
if (codecName == null || numDataUnits <= 0 || numParityUnits <= 0 ||
cellSize <= 0 || cellSize % 1024 != 0) {
throw new ActionException("Illegal EC policy Schema! " +
"A valid codec name should be given, " +
"the dataNum, parityNum and cellSize should be positive and " +
"the cellSize should be divisible by 1024.");
}
validateArgs();

ECSchema ecSchema = new ECSchema(codecName, numDataUnits, numParityUnits);
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(ecSchema, cellSize);
AddErasureCodingPolicyResponse addEcResponse =
dfsClient.addErasureCodingPolicies(new ErasureCodingPolicy[]{ecPolicy})[0];
if (addEcResponse.isSucceed()) {
appendLog(String.format("EC policy named %s is added successfully!",
addEcResponse.getPolicy().getName()));
} else {
appendLog(String.format("Failed to add the given EC policy!"));
localFileSystem.addErasureCodingPolicies(new ErasureCodingPolicy[]{ecPolicy})[0];

if (!addEcResponse.isSucceed()) {
appendLog("Failed to add the given EC policy!");
throw new ActionException(addEcResponse.getErrorMsg());
} else {
appendLog("EC policy is added successfully: " + addEcResponse.getPolicy().getName());
}
}

@Override
public DfsClientType dfsClientType() {
return DfsClientType.DEFAULT_HDFS;
public FsType localFsType() {
return FsType.DEFAULT_HDFS;
}

private void validateArgs() throws ActionException {
if (codecName == null || numDataUnits <= 0 || numParityUnits <= 0 ||
cellSize <= 0 || cellSize % 1024 != 0) {
throw new ActionException("Illegal EC policy Schema! " +
"A valid codec name should be given, " +
"the dataNum, parityNum and cellSize should be positive and " +
"the cellSize should be divisible by 1024.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.smartdata.hdfs.action;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.smartdata.action.annotation.ActionSignature;

Expand All @@ -35,29 +35,29 @@
public class CheckErasureCodingPolicy extends HdfsAction {
public static final String RESULT_OF_NULL_EC_POLICY =
"The EC policy is replication.";
private String srcPath;

private Path srcPath;

@Override
public void init(Map<String, String> args) {
super.init(args);
this.srcPath = args.get(HdfsAction.FILE_PATH);
this.srcPath = getPathArg(FILE_PATH);
}

@Override
public void execute() throws Exception {
if (StringUtils.isBlank(srcPath)) {
throw new IllegalArgumentException("File parameter is missing! ");
}
validateNonEmptyArg(FILE_PATH);

String result = Optional.ofNullable(dfsClient.getErasureCodingPolicy(srcPath))
String result = Optional.ofNullable(
localFileSystem.getErasureCodingPolicy(srcPath))
.map(ErasureCodingPolicy::toString)
.orElse(RESULT_OF_NULL_EC_POLICY);

appendResult(result);
}

@Override
public DfsClientType dfsClientType() {
return DfsClientType.DEFAULT_HDFS;
public FsType localFsType() {
return FsType.DEFAULT_HDFS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
public class DisableErasureCodingPolicy extends HdfsAction {
public static final String EC_POLICY_NAME = "-policy";

private String policyName;

@Override
Expand All @@ -41,12 +42,12 @@ public void init(Map<String, String> args) {

@Override
public void execute() throws Exception {
dfsClient.disableErasureCodingPolicy(policyName);
localFileSystem.disableErasureCodingPolicy(policyName);
appendLog(String.format("The EC policy named %s is disabled!", policyName));
}

@Override
public DfsClientType dfsClientType() {
return DfsClientType.DEFAULT_HDFS;
public FsType localFsType() {
return FsType.DEFAULT_HDFS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
public class EnableErasureCodingPolicy extends HdfsAction {
public static final String EC_POLICY_NAME = "-policy";

private String policyName;

@Override
Expand All @@ -41,12 +42,12 @@ public void init(Map<String, String> args) {

@Override
public void execute() throws Exception {
dfsClient.enableErasureCodingPolicy(policyName);
localFileSystem.enableErasureCodingPolicy(policyName);
appendLog(String.format("The EC policy named %s is enabled!", policyName));
}

@Override
public DfsClientType dfsClientType() {
return DfsClientType.DEFAULT_HDFS;
public FsType localFsType() {
return FsType.DEFAULT_HDFS;
}
}
Loading

0 comments on commit dfac413

Please sign in to comment.