Many times in Data Science, 80% of the "science" is simply organizing the data for analysis. To handle the issue of distributed resource allocation, the Apache Mesos project was created to abstract away machine information, exposing only the important resources to data frameworks.
The purpose of this blog post is to demonstate how an Apache Mesos Framework is written in Java and how it can be used to dynamically provision new processes as they become available, regardless of the underlying architecture.
Mesos is used to provide an elastic interface for adding machines to a cluster, each of which handle the execution of contained user envionments and code. Machines can be added to the system by simply registering them with Mesos, and they will be automatically connected to your system.
#What is Mesos?
Mesos is a distributed systems kernel, a low level framework for managing resources across many machines.
It consists of a master daemon that manages slave daemons running on each cluster node, and mesos applications (also called frameworks) that run tasks on these slaves.
In turn, a framework running on top of Mesos consists of two components: a scheduler that registers with the master to be offered resources, and an executor process that is launched on slave nodes to run the framework’s tasks.
While the master determines how many resources are offered to each framework, the frameworks' schedulers select which of the offered resources to use. When a frameworks accepts offered resources, it passes to Mesos a description of the tasks it wants to run on them. In turn, Mesos launches the tasks on the corresponding slaves's executor process.
While this example is running a simple HTTP server, more complex systems that utilize containers or distribution can be orchestrated by a Mesos framework.
Mesos can be used to connect Akka Supervisor processes to the our system. Akka is a distributed message passing framework that is well suited for data analysis. When a new machine is provisioned, the serialized connection information is passed to the Executor process on that machine. Based on the provided resources, a Supervisor spawns a number of "workers" that run user code within Docker containers.
We have found this architecture to be elastic, maintainable, and allows for a new data processing framework to share resources with other Big Data frameworks such as Hadoop, Kafka, and Spark.
To create a Mesos framework, the Scheduler
and Executor
Java interfaces must be implemented.
The Mesos Scheduler connects to the Mesos master and is responsible for handling "offers" periodically. The offer contains the host info of a slave node on the Mesos Cluster, along with resources available for use by the Scheduler.
A resource takes the following form:
{ name: "cpus" type: SCALAR scalar { value: 8.0 } role: "*" }
Thus, the responsibility of a Mesos Scheduler is to accept offers and spawn tasks to run on the machines offering the resources. The following code spawns tasks when given new resources.
public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
for (Protos.Offer offer : offers) {
List<Protos.TaskInfo> tasks = new ArrayList<>();
/** Generate a unique taskID */
Protos.TaskID taskId = Protos.TaskID.newBuilder()
.setValue(Integer.toString(taskIDGenerator.incrementAndGet())).build();
/** Create the Task */
Protos.TaskInfo task = Protos.TaskInfo.newBuilder()
.setName("task " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.setExecutor(MesosExecutor.getExecutorCommand())
.addResources(MesosHelper.scalarResouce("cpus", 1))
.addResources(MesosHelper.scalarResouce("mem", 128))
.build();
tasks.add(task);
}
/** Launch the task */
driver.launchTasks(Lists.newArrayList(offer.getId()), tasks);
}
For each offer, we instruct the Executor running on that machine to spawn a task that uses one CPU and 128MB of memory. We can also set the commands to run from the scheduler, or even set a Docker container to be used as the execution environment.
One important portion of this code is the following command
.setExecutor(MesosExecutor.getExecutorScript())
This command instructs the task to spawn a MesosExecutor process on the machine running the task if necessary, the name of the Execution Framework generated by this heper function directs the task to the proper location.
The script to run the executor calls to the Main method of the MesosExecutor class. This allows you to run your MesosExecutor when a new node connects to Mesos.
#!/bin/sh
echo "starting Executor"
export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos-0.20.1.dylib
cd /Users/sjarvie/mesos_example/
mvn clean compile exec:java \
-DskipTests \
-Pmvn \
-Dexec.mainClass="org.zillabyte.MesosExecutor"
Once we have directed the task to its executor, the Executor can utilize these resources for any arbitrary purpose.
Our framework has now allocated a task with CPU and memory to be executed. The Executor process can use this task notification to perform a task. In this example, a simple Python server will be run as the task.
To run a task, the launchTask
method must be implemented. This is called when a new task is spawned. The process ran by this function is limited by the amount of resources allocated to the task.
public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
Builder status = TaskStatus.newBuilder()
.setTaskId(task.getTaskId());
/** Notify Mesos that the task is now running **/
status.setState(TaskState.TASK_RUNNING);
driver.sendStatusUpdate(status.build());
/** Run the task */
try {
new DefaultExecutor().execute(pythonServerCommand());
status.setState(TaskState.TASK_FINISHED);
} catch (IOException e) {
status.setState(TaskState.TASK_FAILED);
e.printStackTrace();
}
/** Update the task status */
driver.sendStatusUpdate(status.build());
}
In this code, we are spawning a Python Server process that serves requests on port 8080. We pass the status of the task to the MesosScheduler via the driver.sendStatusUpdate
method. This allows for our framework to intelligently manage its resources on a per task basis.
git clone https://github.com/sjarvie/mesos_example
Start the mesos master (Ensure work directory exists and has proper permissions).
$ ./bin/mesos-master.sh --ip=127.0.0.1 --work_dir=/var/lib/mesos
In another session. Start mesos slave.
$ ./bin/mesos-slave.sh --master=127.0.0.1:5050
After configuring the run_scheduler.sh
and run_executor.sh
scripts with the proper directory locations, simply run the following command.
sh run_scheduler.sh
To view the Mesos web console, navigate to 127.0.0.1:5050
, and you should now be able to see the status of your Mesos cluster.
In the Frameworks Pane, your example framework can be seen along with the tasks it has spawned. The STDOUT/STDERR of the task can also be viewed for debugging purposes.
To access the web server, navigate to 127.0.0.1:8080
, to request a directory listing.
This blog post assumes that you have installed Mesos on your local machine. The following notes may be helpful when configuring your environment.
-
In the
run_scheduler.sh
andrun_executor.sh
scripts, theMESOS_NATIVE_JAVA_LIBRARY
environment variable points to the location of the Mesos library files. On an OSX installation, the files can be found in the/usr/local/lib
directory. -
The locations of scripts should be inserted into the proper locations based on where this repository is cloned. This post will point out such instances so that the code runs properly.
Mesos is a young framework with a growing community of bloggers and contributors. The following links aided the construction of this blog post.