Last updated : 14/7/2024 01:47 GMT+8
Author : Aiman Amri
Config files: https://github.com/aimanamri/raspberry-pi4-hadoop-spark-cluster/tree/main/config_files
This project serves as my self-documentation of learning distributed data storage, parallel processing, and Linux OS using Apache Hadoop, Apache Spark and Raspbian OS.
In this project, we'll be setup 3-node cluster using Raspberry Pi 4, install HDFS and run Spark processing jobs via YARN. Furthermore, we can horizontally scale up in the future to add more computing nodes.
Although Hadoop provides MapReduce for processing, we won't cover that in this project. Instead, we will be more focusing on Spark because Spark is faster these days and easier to work with.
- Hadoop Distributed File System (HDFS): HDFS as the distributed storage solution, enabling efficient data management across multiple nodes.
- Hadoop YARN: YARN acts as the resource manager, ensuring optimal allocation of computing resources for Hadoop and Spark jobs.
- Spark : Distributed processing system used for big data workloads and it utilizes in-memory caching. APIs in Java, Scala, Python and R.
- Spark on YARN: Instead of running Spark jobs in standalone mode, Spark application can be launched in cluster mode or client mode with YARN.
To replicate this project, you’ll need the following:
- Hadoop 3.4.0: Install Hadoop to set up your distributed environment.
- Spark 3.5.1: Install the latest version of Spark.
- Java 8: Ensure Java 8 or 11 (runtime) is installed on your Raspberry Pi servers.
- Raspberry Pi 4: Here, we're setup 3-node cluster, so we’ll need three Raspberry Pi 4 devices, each with its power supply (5V/3A DC or 5.1V/ 3A DC minimum).
- Ethernet Cables (if using wired connections)
- Micro SD card and Micro SD card reader
[1] How to Install and Set Up a 3-Node Hadoop Cluster
[2] Running Spark on Top of a Hadoop YARN Cluster
[3] Apache Hadoop Documentation
[4] Apache Spark Documentation
[5] Building a Raspberry Pi Hadoop / Spark Cluster
[6] Getting Started with Hadoop & Apache Spark
[7] How to install hadoop cluster/multi node on ubuntu server 18.04
[8] Hadoop : Capacity Scheduler
Stack all the Pis into a rack specialized for them. Plug in the power supply to turn it on.
- Installing OS
I went with Raspberry Pi OS with desktop for the master Pi, which has a GUI, and Raspberry Pi OS Lite for the worker Pis, which only have a CLI. Then, use the Raspberry Pi Imager to write the Raspberry Pi OS 64 Bit to each Pi.
- Configuring host name,SSH and network connection
I've set host name and SSH credential in each Pi. For network connection, I used Wi-Fi. If you are using wired connection, you can skip the Wi-Fi setup.
However, it is preferable to use wired connection for stabilibility especially during transferring the files across Pis.
**The following steps will need to be done on each Pi.**
sudo apt update
sudo apt upgrade -y
sudo raspi-config --expand-rootfs
sudo reboot now
Static IP addresses are required. Here in this project, I assigned them on my home router. I am using 192.168.35.XX
(usually home network is 192.168.0.XX
) . Alternatively, you may assign them by editing /etc/network/interfaces
file.
**The following steps will need to be done on each Pi.**
For hostname, I've already set it up during flashing the OS into the Pi. Alternatively, you can edit /etc/hostname
file and then reboot.
For each node to communicate with each other by name, edit the /etc/hosts
file to add the IP addresses of the three Raspberry Pi. Make sure to delete the localhost 127.0.0.1 line from the file.
File: /etc/hosts
192.168.35.11 pi1
192.168.35.12 pi2
192.168.35.13 pi3
**The following steps will need to be done on each Pi.**
sudo adduser hadoop
sudo usermod -aG sudo hadoop
su - hadoop
**FROM HERE, MAKE SURE THAT YOU ARE LOGGED IN AS hadoop
USER WHEN RUNNING ALL THE COMMANDS**
The master node, pi1
will use an SSH connection to connect to other nodes (pi2
,pi3
) with key-pair authentication. This will allow the master node to actively manage the cluster.
**Perform these steps on the master Pi until only until directed to do otherwise.**
- Create SSH Directory (
~/.ssh
) if not exist and set suitable permission accordingly.
mkdir ~/.ssh
touch ~/.ssh/config
sudo chmod 700 ~/.ssh/
sudo chmod 600 ~/.ssh/*
- Public/Private Key Pair
Next, create an SSH key pair on the Pi using:
ssh-keygen -t rsa -b 4096
When generating this key, leave the password field blank so your Hadoop user can communicate unprompted.
-
Repeat SSH keygen on other nodes
pi2
andpi3
while logging ashadoop
user. -
SSH Aliases
On every Pi, while logging ashadoop
user:
Edit SSH config file using command below.
nano ~/.ssh/config
File: /home/hadoop/.ssh/config
Host pi1
User hadoop
Hostname 192.168.35.11
Host pi2
User hadoop
Hostname 192.168.35.12
Host pi3
User hadoop
Hostname 192.168.35.13
- Then use the following command on all Pis (including
pi1
) to copy the public keys intopi1
's authorized key list:
ssh-copy-id pi1
- Finally, you can copy
pi1
's SSH configuration files to the rest of the Pi's using the following commands:
(piX below refer topi2
,pi3
)
scp ~/.ssh/authorized_keys piX:~/.ssh/authorized_keys
Now, we can SSH into any Pis from master without entering any password. Please double check this if it is working properly.
**Perform these steps on the master Pi until only until directed to do otherwise.**
To install Java 8, download the tar package from the official website of Oracle. The OpenJDK-8 is an older version so it is not available in the Debian repository. See more.
DOWNLOAD LINK (Linux ARM64 Compressed Archive) : https://www.oracle.com/java/technologies/javase/javase8u211-later-archive-downloads.html#license-lightbox
Then, copy the tar package to worker nodes. (piX below refer to pi2
, pi3
)
scp .\jdk-8u401-linux-aarch64.tar.gz piX:~/
Repeat the steps below on each Pi.
- Run the following commands after downloading the tar package:
sudo mkdir /usr/lib/jvm/
sudo tar -zxvf jdk-8u401-linux-aarch64.tar.gz -C /usr/lib/jvm/
sudo update-alternatives --install /usr/bin/java java /usr/lib/jvm/jdk1.8.0_401/bin/java 1
- Display Java version
java -version
The output will be as below:
java version "1.8.0_401"
Java(TM) SE Runtime Environment (build 1.8.0_401-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.401-b10, mixed mode)
- To set up the environmental variables, run
sudo nano /etc/profile.d/java.sh
export PATH=$PATH:/usr/lib/jvm/jdk1.8.0_401/bin/
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401/
- Load the variables using:
source /etc/profile.d/java.sh
- Then, run the following to check Java installation path,
JAVA_HOME
:
echo $JAVA_HOME
Java 8 is completed!
**Perform these steps on the master Pi until only until directed to do otherwise.**
- Download Apache Hadoop 3.4.0 binary-aarch64 from Apache Hadoop website by using wget command as below:
wget https://dlcdn.apache.org/hadoop/common/hadoop-3.4.0/hadoop-3.4.0-aarch64.tar.gz
Note : Binary download URL above may be different if different version is used
- Extract the file in
hadoop
user home directory, then delete the compressed file.
tar -xzvf hadoop-3.4.0-aarch64.tar.gz
rm hadoop-3.4.0-aarch64.tar.gz
To get the HDFS up and running, we need to modify some configuration files.
- Add Hadoop binaries to your PATH. Edit
/home/hadoop/.profile
and add the following line: File:/home/hadoop/.profile
PATH=/home/hadoop/hadoop-3.4.0/bin:/home/hadoop/hadoop-3.4.0/sbin:$PATH
This will enable us to run hadoop commands without going into the hadoop bin and sbin directory!
- Add Hadoop to your PATH for the shell. Edit
.bashrc
and add the following lines:
File:/home/hadoop/.bashrc
# path and options for java
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401/
# path and options for Hadoop
export HADOOP_HOME=/home/hadoop/hadoop-3.4.0
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_INSTALL=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
Then, source the .bashrc
file to ensure it updates.
source ~/.bashrc
- Set the value of
JAVA_HOME
inhadoop-env.sh
You'll have to scroll down to find the correct line. Uncomment the line and add the correct path of your JAVA installation path to the variable. File:/home/hadoop/hadoop-3.4.0/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401/
- Set NameNode location
Edit~/hadoop/etc/hadoop/core-site.xml
file to set the NameNode location topi1
* on port 9000 File:~/hadoop-3.4.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://pi1:9000</value>
</property>
</configuration>
- Set path for HDFS
Edithdfs-site.xml
to resemble the following configuration: File:~/hadoop-3.4.0/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/hdfs/namenode/</value>
</property>
<property>
<name>dfs.datanode.name.dir</name>
<value>/home/hadoop/hdfs/datanode/</value>
</property>
</configuration>
The first property, dfs.replication
, indicates how many times data is replicated in the cluster (Default is 3
). You can set 2
to have all the data duplicated on the two nodes. Don’t enter a value higher than the actual number of worker nodes.
- Then, inside of the
hadoop
home directory, create folders in each node (pi1
,pi2
,pi3
) using:
mkdir -p /home/hadoop/hdfs/{namenode,datanode}
- Set YARN as Job Scheduler for MapReduce operations
Edit themapred-site.xml
file, setting YARN as the default framework for MapReduce operations: File:~/hadoop-3.4.0/etc/hadoop/mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value>
</property>
<property>
<name>yarn.app.mapreduce.am.resource.memory-mb</name>
<value>512</value>
</property>
<property>
<name>mapreduce.map.resource.memory-mb</name>
<value>256</value>
</property>
<property>
<name>mapreduce.reduce.resource.memory-mb</name>
<value>256</value>
</property>
</configuration>
Note : If you're running on single node: (1) Format the HDFS, (2) start-dfs.sh
and then you can start exploring HDFS.
- Edit
yarn-site.xml
, which contains the configuration options for YARN.
To configure the ResourceManager to use the CapacityScheduler for setting up queues (dev
andprod
queues), the following property need to be included:
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
File: ~/hadoop-3.4.0/etc/hadoop/yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>yarn.acl.enable</name>
<value>0</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>pi1</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>1536</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>128</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>prod,dev</value>
</property>
<property>
<name>yarn.scheduler.capacity.prod.capacity</name>
<value>0.5</value>
</property>
<property>
<name>yarn.scheduler.capacity.dev.capacity</name>
<value>0.5</value>
</property>
<property>
<name>yarn.scheduler.capacity.dev.maximum-capacity</name>
<value>0.5</value>
</property>
<property>
<name>yarn.scheduler.capacity.prod.maximum-capacity</name>
<value>0.7</value>
</property>
</configuration>
Note:
Based on above properties, capacity of resource allocation of each queue is 50%
. We also define how much of our resources can be allocated to our queues at maximum. So, if one queue does not use all the resources allocated to it, then YARN will allocate those resources to the other queue. For example, dev
queue will never use more than 50%
of the allocated resources in the system. However, YARN allows prod
queue to use up to 70%
of the resource if dev
queue is not used.
- Then, edit
capacity-scheduler.xml
by copying exactly as written in this file.
File:~/hadoop-3.4.0/etc/hadoop/capacity-scheduler.xml
For the Master:
Edit ~/hadoop-3.4.0/etc/hadoop/master
file.
pi1
For the Workers:
Edit ~/hadoop-3.4.0/etc/hadoop/workers
file.
pi2
pi3
- SSH into
pi2
and thenpi3
and run the following commands:
sudo mkdir hadoop-3.4.0
sudo chown -R hadoop:hadoop hadoop-3.4.0/
- Copy the Hadoop config files to the worker nodes from master node:
for pi in pi2 pi3; do rsync -avxP ~/hadoop-3.4.0/ $pi:/home/hadoop/hadoop-3.4.0/; done
To run for the first time, the HDFS needs to be formatted. Hence, run the following command on master node, pi1
:
*THIS WILL DELETE ALL DATA IN THE HDFS*
hdfs namenode -format
Our Hadoop installation is now configured and ready to run!
- Run the following command to start the HDFS and the YARN:
start-dfs.sh && start-yarn.sh
- Check that every process is running with the
jps
command on each node. On master node pi1, we should see the following (the PID number will be different):
5733 Jps
5623 ResourceManager
5435 SecondaryNameNode
5245 NameNode
And on pi2 and pi3 we should see the following:
1538 DataNode
1720 Jps
1640 NodeManager
- To stop HDFS and YARN on master and worker nodes, run the following command from pi1:
stop-dfs.sh && stop-yarn.sh
- HDFS : http://pi1:9870
- YARN : http://pi1:8088 (default port: 8088)
**The following steps only need to be performed on the master Pi, pi1
**
- Download Apache Spark 3.5.1 binary from Apache Spark website by using wget command as below:
wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
Note : Binary download URL above may be different if different version is used
- Extract the file in
hadoop
user home directory and rename the directory. Then, delete the compressed file.
tar -xzvf spark-3.5.1-bin-hadoop3.tgz
mv spark-3.5.1-bin-hadoop3 spark-3.5.1
rm spark-3.5.1-bin-hadoop3.tgz
- Add Spark binaries to your PATH. Edit
/home/hadoop/.profile
and add the following line: File:/home/hadoop/.profile
# add SPARK bin and sbin to the end of the line PATH variable
PATH=/home/hadoop/spark-3.5.1/bin:/home/hadoop/spark-3.5.1/sbin:$PATH
- Add Hadoop to your PATH for the shell. Edit
.bashrc
and add the following lines: File:/home/hadoop/.bashrc
# path and options for Spark
export SPARK_HOME=/home/hadoop/spark-3.5.1
export PATH=$PATH:$SPARK_HOME/bin
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
export PYSPARK_PYTHON=python3
# add SPARK bin and sbin to the end of the line PATH variable
export PATH=/home/hadoop/spark-3.5.1/bin:/home/hadoop/spark-3.5.1/sbin:$PATH
Then, source the .bashrc
file to ensure it updates.
source ~/.bashrc
- Configure Workers
Copyworkers.template
toworkers
file inspark-3.5.1/conf
directory.
cp workers.template workers
Then, add the hostname/IP address of workers nodes at the end line inside workers
file.
File: /home/hadoop/spark-3.5.1/conf/workers
pi2
pi3
- Configure Memory Allocation
For nodes with less than 4G RAM, the default configuration is not adequate and may trigger swapping and poor performance, or even the failure of application initialization due to lack of memory. See more [2]
cp spark-defaults.conf.template spark-defaults.conf
- Edit
spark-defaults.conf
and add the following configurations. File:/home/hadoop/spark-3.5.1/conf/spark-defaults.conf
spark.driver.memory 512m
spark.yarn.am.memory 512m
spark.executor.memory 512m
- Enable the Web UI of Spark to track your submitted jobs by editing
spark-defaults.conf
file.
Add the following lines. File:/home/hadoop/spark-3.5.1/conf/spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://pi1:9000/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://pi1:9000/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080
If you specify a bigger interval, you will have some delay between what you see in the History Server and the real time status of your application. If you use a shorter interval, you will increase I/O on the HDFS.
- Restart HDFS and YARN if stopped. Then, create the log directory in the HDFS:
start-dfs.sh && start-yarn.sh
hdfs dfs -mkdir /spark-logs
- Run the History Server, HDFS and YARN:
start-history-server.sh
- To ensure all required daemons, run
jps
command. The output should be as below:
7971 Jps
7879 HistoryServer
7417 ResourceManager
7195 SecondaryNameNode
7038 NameNode
- Run the Pi parallel processing job again to generate logs in the HDFS:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --queue dev /home/hadoop/spark-3.5.1/examples/jars/spark-examples_2.12-3.5.1.jar 10
-
Access the History Server by navigating to http://pi1:18080 in a web browser:
-
To stop all running daemons:
stop-dfs.sh && stop-yarn.sh && stop-history-server.sh
Ensure that HDFS and YARN are running. To confirm, run jps
on each of the nodes.
As an example, we are going to use Titanic dataset. Download the CSV file using wget on another terminal:
wget https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv
Then, upload it to HDFS directory and then confirm the file is already uploaded.
hdfs dfs -put ~/titanic.csv /titanic.csv
hdfs dfs -ls /
- Start Spark shell.
spark-shell --master yarn --queue dev --name APPLICATION_NAME_SCALA
- Loading Data
val df = spark.read.format("csv").option("header",true).option("separator",",").load("hdfs:///titanic.csv")
df.show(10,false)
- To exit, enter:
:q
- Start PySpark shell.
pyspark --master yarn --queue dev --name APPLICATION_NAME_PYSPARK
- Loading Data
df = spark.read.format("csv").option("header",True).option("separator",",").load("hdfs:///titanic.csv")
df.show(10,False)
- To exit, enter:
exit()
Here, I will show only for Python language. But, you can also run Scala and R script.
- Create Python file as
script.py
. - Edit the file to run simple task..
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
df = spark.createDataFrame(sample_data)
from pyspark.sql.functions import col, regexp_replace
# Remove additional spaces in name
def remove_extra_spaces(df, column_name):
# Remove extra spaces from the specified column
df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
return df_transformed
transformed_df = remove_extra_spaces(df, "name")
transformed_df.show()
- Submit Python script as Spark job.
spark-submit --master yarn --queue prod ~/script.py
Other examples to run Scala and R:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
We now have a working Spark Cluster. Keep exploring!