diff --git a/leader/build.gradle b/leader/build.gradle index 27bedf3a..da29397e 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -43,10 +43,10 @@ dependencies { compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0' compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1' compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.3' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.3' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.3' - compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.3' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4' + compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4' compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908' diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index 731efb8b..e21b8964 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -128,6 +128,7 @@ private void run(JobOpts opts, String[] args) throws Exception { // Setup local ama folder on hdfs. try { + if (!fs.exists(jarPathQualified)) { File home = new File(opts.home); fs.mkdirs(jarPathQualified); @@ -139,6 +140,7 @@ private void run(JobOpts opts, String[] args) throws Exception { // setup frameworks FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config); for (String group : frameworkFactory.groups()) { + System.out.println("===> setting up " + group); FrameworkSetupProvider framework = frameworkFactory.getFramework(group); //creating a group folder @@ -153,9 +155,11 @@ private void run(JobOpts opts, String[] args) throws Exception { } } } catch (IOException e) { + System.out.println("===>" + e.getMessage()); LOGGER.error("Error uploading ama folder to HDFS.", e); exit(3); } catch (NullPointerException ne) { + System.out.println("===>" + ne.getMessage()); LOGGER.error("No files in home dir.", ne); exit(4); } @@ -234,7 +238,7 @@ private void run(JobOpts opts, String[] args) throws Exception { reportBarrier.setBarrier(); reportBarrier.waitOnBarrier(); - String address = new String( client.getData().forPath("/" + newJobId + "/broker")); + String address = new String(client.getData().forPath("/" + newJobId + "/broker")); System.out.println("===> " + address); setupReportListener(address); diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala index 0fe378ac..7502698b 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala @@ -15,19 +15,22 @@ class SparkSetupProvider extends FrameworkSetupProvider { private var env: String = _ private var conf: ClusterConfig = _ - private val runnersResources = mutable.Map[String,Array[File]]() - private var execData: ExecData = _ - private var sparkExecConfigurations = mutable.Map[String, Any]() + private val runnersResources = mutable.Map[String, Array[File]]() + //private var execData: ExecData = _ + private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig - override def init(env: String, conf: ClusterConfig): Unit = { - this.env = env - this.conf = conf - this.execData = DataLoader.getExecutorData(env, conf) + private def loadSparkConfig: mutable.Map[String, Any] = { + val execData = DataLoader.getExecutorData(env, conf) val sparkExecConfigurationsurations = execData.configurations.get("spark") if (sparkExecConfigurationsurations.isEmpty) { throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}") } - this.sparkExecConfigurations = sparkExecConfigurations ++ sparkExecConfigurationsurations.get + collection.mutable.Map(sparkExecConfigurationsurations.get.toSeq: _*) + } + + override def init(env: String, conf: ClusterConfig): Unit = { + this.env = env + this.conf = conf runnersResources += "scala" -> Array.empty[File] runnersResources += "sql" -> Array.empty[File] diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 8af5892e..33a45bac 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -256,7 +256,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val commands: List[String] = List( "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ", s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ", - s"java -cp executor.jar:${config.spark.home}/jars/*:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + + s"java -cp spark/jars/*:executor.jar:${config.spark.home}/jars/*:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + "-Xmx1G " + "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala index 9f4c3f9e..70da38eb 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala @@ -106,7 +106,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync, val ctx = Records.newRecord(classOf[ContainerLaunchContext]) val command = s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} | env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz - | java -cp executor-*-all.jar:spark-${config.Webserver.sparkVersion}/lib/* + | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/* | -Dscala.usejavacp=true | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher | ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh index 05733410..c437246e 100755 --- a/leader/src/main/scripts/ama-start-yarn.sh +++ b/leader/src/main/scripts/ama-start-yarn.sh @@ -129,9 +129,9 @@ fi if [ "$FORCE_BIN" = true ] ; then echo "FORCE: Deleting and re-creating /apps/amaterasu folder" eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu" - eval "hdfs dfs -mkdir /apps/amaterasu/" - eval "hdfs dfs -chmod -R 777 /apps/amaterasu/" - eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/" + #eval "hdfs dfs -mkdir /apps/amaterasu/" + #eval "hdfs dfs -chmod -R 777 /apps/amaterasu/" + #eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/" fi eval $CMD | grep "===>"