18 KiB
Run on Hadoop/YARN Clusters
This tutorial provides a step-by-step guide on how to run BigDL-Orca programs on Apache Hadoop/YARN clusters, using a PyTorch Fashion-MNIST program as a working example.
The Client Node that appears in this tutorial refer to the machine where you launch or submit your applications.
1. Basic Concepts
1.1 init_orca_context
A BigDL Orca program usually starts with the initialization of OrcaContext. For every BigDL Orca program, you should call init_orca_context at the beginning of the program as below:
from bigdl.orca import init_orca_context
sc = init_orca_context(cluster_mode, cores, memory, num_nodes,
driver_cores, driver_memory, extra_python_lib, conf)
In init_orca_context, you may specify necessary runtime configurations for running the example on YARN, including:
cluster_mode: one of"yarn-client","yarn-cluster","bigdl-submit"or"spark-submit"when you run on Hadoop/YARN clusters.cores: an integer that specifies the number of cores for each executor (default to be2).memory: a string that specifies the memory for each executor (default to be"2g").num_nodes: an integer that specifies the number of executors (default to be1).driver_cores: an integer that specifies the number of cores for the driver node (default to be4).driver_memory: a string that specifies the memory for the driver node (default to be"1g").extra_python_lib: a string that specifies the path to extra Python packages, separated by comma (default to beNone)..py,.zipor.eggfiles are supported.conf: a dictionary to append extra conf for Spark (default to beNone).
Note:
- All the arguments except
cluster_modewill be ignored when usingbigdl-submitorspark-submitto submit and run Orca programs, in which case you are supposed to specify these configurations via the submit command.
After Orca programs finish, you should always call stop_orca_context at the end of the program to release resources and shutdown the underlying distributed runtime engine (such as Spark or Ray).
from bigdl.orca import stop_orca_context
stop_orca_context()
For more details, please see OrcaContext.
1.2 Yarn-Client & Yarn-Cluster
The difference between yarn-client mode and yarn-cluster mode is where you run your Spark driver.
For yarn-client, the Spark driver runs in the client process, and the application master is only used for requesting resources from YARN, while for yarn-cluster the Spark driver runs inside an application master process which is managed by YARN in the cluster.
For more details, please see Launching Spark on YARN.
1.3 Distributed storage on YARN
Note:
- When you run programs on YARN, you are highly recommended to load/write data from/to a distributed storage (e.g. HDFS or S3) instead of the local file system.
The Fashion-MNIST example in this tutorial uses a utility function get_remote_dir_to_local provided by BigDL to download datasets and create the PyTorch DataLoader on each executor.
import torch
import torchvision
import torchvision.transforms as transforms
from bigdl.orca.data.file import get_remote_dir_to_local
def train_data_creator(config, batch_size):
transform = transforms.Compose([transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))])
get_remote_dir_to_local(remote_path="hdfs://path/to/dataset", local_path="/tmp/dataset")
trainset = torchvision.datasets.FashionMNIST(root="/tmp/dataset", train=True,
download=False, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
shuffle=True, num_workers=0)
return trainloader
2. Prepare Environment
Before running BigDL Orca programs on YARN, you need to properly setup the environment following the steps below.
2.1 Setup JAVA & Hadoop Environment
-
See here to prepare Java in your cluster.
-
Check the Hadoop setup and configurations of your cluster. Make sure you correctly set the environment variable
HADOOP_CONF_DIR, which is needed to initialize Spark on YARN:
export HADOOP_CONF_DIR=/path/to/hadoop/conf
2.2 Install Python Libraries
-
See here to install conda and prepare the Python environment on the Client Node.
-
See here to install BigDL Orca in the created conda environment.
-
You should install all the other Python libraries that you need in your program in the conda environment as well.
torchandtorchvisionare needed to run the Fashion-MNIST example:
pip install torch torchvision
- For more details, please see Python User Guide.
2.3 Run on CDH
-
For CDH users, the environment variable
HADOOP_CONF_DIRshould be/etc/hadoop/confby default. -
The Client Node may have already installed a different version of Spark than the one installed with BigDL. To avoid conflicts, unset all Spark-related environment variables (you may use use
env | grep SPARKto find all of them):
unset SPARK_HOME
unset ...
3. Prepare Dataset
To run the Fashion-MNIST example provided by this tutorial on YARN, you should upload the Fashion-MNIST dataset to a distributed storage (such as HDFS or S3).
First, download the Fashion-MNIST dataset manually on your Client Node. Note that PyTorch FashionMNIST Dataset requires unzipped files located in FashionMNIST/raw/ under the root folder.
# PyTorch official dataset download link
git clone https://github.com/zalandoresearch/fashion-mnist.git
# Move the dataset under the folder FashionMNIST/raw
mv /path/to/fashion-mnist/data/fashion /path/to/local/data/FashionMNIST/raw
# Extract FashionMNIST archives
gzip -dk /bigdl/nfsdata/dataset/FashionMNIST/raw/*
Then upload it to a distributed storage. Sample command to upload data to HDFS is as follows:
hdfs dfs -put /path/to/local/data/FashionMNIST hdfs://path/to/remote/data
In the given example, you can specify the argument --remote_dir to be the directory on a distributed storage for the Fashion-MNIST dataset.
4. Prepare Custom Modules
Spark allows to upload Python files (.py), and zipped Python packages (.zip) across the cluster by setting --py-files option in Spark scripts or specifying extra_python_lib in init_orca_context.
The FasionMNIST example needs to import modules from model.py.
- When using
pythoncommand, please specifyextra_python_libininit_orca_context.
init_orca_context(..., extra_python_lib="model.py")
For more details, please see BigDL Python Dependencies.
- When using
bigdl-submitorspark-submit, please specify--py-filesoption in the submit command.
bigdl-submit # or spark-submit
...
--py-files model.py
...
For more details, please see Spark Python Dependencies.
- After uploading
model.pyto YARN, you can import this custom module as follows:
from model import model_creator, optimizer_creator
Note:
If your program depends on a nested directory of Python files, you are recommended to follow the steps below to use a zipped package instead.
- Compress the directory into a zipped package.
zip -q -r FashionMNIST_zipped.zip FashionMNIST
-
Upload the zipped package (
FashionMNIST_zipped.zip) to YARN by setting--py-filesor specifyingextra_python_libas discussed above. -
You can then import the custom modules from the unzipped file in your program as follows:
from FashionMNIST.model import model_creator, optimizer_creator
5. Run Jobs on YARN
In the following part, we will illustrate three ways to submit and run BigDL Orca applications on YARN.
- Use
pythoncommand - Use
bigdl-submit - Use
spark-submit
You can choose one of them based on your preference or cluster settings.
We provide the running command for the Fashion-MNIST example in this section.
5.1 Use python Command
This is the easiest and most recommended way to run BigDL Orca on YARN as a normal Python program. Using this way, you only need to prepare the environment on the Client Node and the environment would be automatically packaged and distributed to the YARN cluster.
See here for the runtime configurations.
5.1.1 Yarn Client
Run the example with the following command by setting the cluster_mode to "yarn-client":
python train.py --cluster_mode yarn-client --remote_dir hdfs://path/to/remote/data
5.1.2 Yarn Cluster
Run the example with the following command by setting the cluster_mode to "yarn-cluster":
python train.py --cluster_mode yarn-cluster --remote_dir hdfs://path/to/remote/data
5.1.3 Jupyter Notebook
You can easily run the example in a Jupyter Notebook using yarn-client mode. Launch the notebook using the following command:
jupyter notebook --notebook-dir=/path/to/notebook/directory --ip=* --no-browser
You can copy the code in train.py to the notebook and run the cells. Set the cluster_mode to "yarn-client" in init_orca_context.
sc = init_orca_context(cluster_mode="yarn-client", cores=4, memory="10g", num_nodes=2,
driver_cores=2, driver_memory="4g",
extra_python_lib="model.py")
Note that Jupyter Notebook cannot run on yarn-cluster mode, as the driver is not running on the Client Node (where you run the notebook).
5.2 Use bigdl-submit
For users who want to use a script instead of Python command, BigDL provides an easy-to-use bigdl-submit script, which could automatically setup BigDL configuration and jars files from the current activate conda environment.
Set the cluster_mode to "bigdl-submit" in init_orca_context.
sc = init_orca_context(cluster_mode="bigdl-submit")
Pack the current activate conda environment to an archive on the Client Node before submitting the example:
conda pack -o environment.tar.gz
Some runtime configurations for Spark are as follows:
--executor-memory: the memory for each executor.--driver-memory: the memory for the driver node.--executor-cores: the number of cores for each executor.--num_executors: the number of executors.--py-files: the extra Python dependency files to be uploaded to YARN.--archives: the conda archive to be uploaded to YARN.
5.2.1 Yarn Client
Submit and run the example for yarn-client mode following the bigdl-submit script below:
bigdl-submit \
--master yarn \
--deploy-mode client \
--executor-memory 10g \
--driver-memory 4g \
--executor-cores 4 \
--num-executors 2 \
--py-files model.py \
--archives /path/to/environment.tar.gz#environment \
--conf spark.pyspark.driver.python=/path/to/python \
--conf spark.pyspark.python=environment/bin/python \
train.py --cluster_mode bigdl-submit --remote_dir hdfs://path/to/remote/data
In the bigdl-submit script:
--master: the spark master, set it to "yarn".--deploy-mode: set it to "client" when running programs on yarn-client mode.--conf spark.pyspark.driver.python: set the activate Python location on Client Node as the driver's Python environment. You can find it by runningwhich python.--conf spark.pyspark.python: set the Python location in conda archive as each executor's Python environment.
5.2.2 Yarn Cluster
Submit and run the program for yarn-cluster mode following the bigdl-submit script below:
bigdl-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 10g \
--driver-memory 4g \
--executor-cores 4 \
--num-executors 2 \
--py-files model.py \
--archives /path/to/environment.tar.gz#environment \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=environment/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=environment/bin/python \
train.py --cluster_mode bigdl-submit --remote_dir hdfs://path/to/remote/data
In the bigdl-submit script:
--master: the spark master, set it to "yarn".--deploy-mode: set it to "cluster" when running programs on yarn-cluster mode.--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON: set the Python location in conda archive as the Python environment of the Application Master.--conf spark.executorEnv.PYSPARK_PYTHON: also set the Python location in conda archive as each executor's Python environment. The Application Master and the executors will all use the archive for the Python environment.
5.3 Use spark-submit
When you are not able to install BigDL using conda on the Client Node , please use the spark-submit script instead.
Set the cluster_mode to "spark-submit" in init_orca_context.
sc = init_orca_context(cluster_mode="spark-submit")
Before submitting the application on the Client Node, you need to:
- Prepare the conda environment on a Development Node where conda is available and pack the conda environment to an archive:
conda pack -o environment.tar.gz
- Send the Conda archive to the Client Node;
scp /path/to/environment.tar.gz username@client_ip:/path/to/
On the Client Node:
- Download Spark and setup the environment variables
${SPARK_HOME}and${SPARK_VERSION}.
export SPARK_HOME=/path/to/spark # the folder path where you extract the Spark package
export SPARK_VERSION="downloaded spark version"
- Download and unzip a BigDL assembly package from BigDL Assembly Spark 2.4.6 or BigDL Assembly Spark 3.1.2 (according to your Spark version), then setup the environment variables
${BIGDL_HOME}and${BIGDL_VERSION}.
export BIGDL_HOME=/path/to/unzipped_BigDL
export BIGDL_VERSION="downloaded BigDL version"
Some runtime configurations for Spark are as follows:
--executor-memory: the memory for each executor.--driver-memory: the memory for the driver node.--executor-cores: the number of cores for each executor.--num_executors: the number of executors.--py-files: the extra Python dependency files to be uploaded to YARN.--archives: the conda archive to be uploaded to YARN.
5.3.1 Yarn Client
Submit and run the program for yarn-client mode following the spark-submit script below:
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 10g \
--driver-memory 4g \
--executor-cores 4 \
--num-executors 2 \
--archives /path/to/environment.tar.gz#environment \
--properties-file ${BIGDL_HOME}/conf/spark-bigdl.conf \
--conf spark.pyspark.driver.python=/path/to/python \
--conf spark.pyspark.python=environment/bin/python \
--py-files ${BIGDL_HOME}/python/bigdl-spark_${SPARK_VERSION}-${BIGDL_VERSION}-python-api.zip,model.py \
--jars ${BIGDL_HOME}/jars/bigdl-assembly-spark_${SPARK_VERSION}-${BIGDL_VERSION}-jar-with-dependencies.jar \
train.py --cluster_mode spark-submit --remote_dir hdfs://path/to/remote/data
In the spark-submit script:
--master: the spark master, set it to "yarn".--deploy-mode: set it to "client" when running programs on yarn-client mode.--properties-file: the BigDL configuration properties to be uploaded to YARN.--conf spark.pyspark.driver.python: set the activate Python location on Client Node as the driver's Python environment. You can find the location by runningwhich python.--conf spark.pyspark.python: set the Python location in conda archive as each executor's Python environment.--jars: upload and register BigDL jars to YARN.
5.3.2 Yarn Cluster
Submit and run the program for yarn-cluster mode following the spark-submit script below:
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 4g \
--driver-memory 4g \
--executor-cores 4 \
--num-executors 2 \
--archives /path/to/environment.tar.gz#environment \
--properties-file ${BIGDL_HOME}/conf/spark-bigdl.conf \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=environment/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=environment/bin/python \
--py-files ${BIGDL_HOME}/python/bigdl-spark_${SPARK_VERSION}-${BIGDL_VERSION}-python-api.zip,model.py \
--jars ${BIGDL_HOME}/jars/bigdl-assembly-spark_${SPARK_VERSION}-${BIGDL_VERSION}-jar-with-dependencies.jar \
train.py --cluster_mode spark-submit --remote_dir hdfs://path/to/remote/data
In the spark-submit script:
--master: the spark master, set it to "yarn".--deploy-mode: set it to "cluster" when running programs on yarn-cluster mode.--properties-file: the BigDL configuration properties to be uploaded to YARN.--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON: set the Python location in conda archive as the Python environment of the Application Master.--conf spark.executorEnv.PYSPARK_PYTHON: also set the Python location in conda archive as each executor's Python environment. The Application Master and the executors will all use the archive for the Python environment.--jars: upload and register BigDL jars to YARN.