* update * update * update * update * update * update with comments * update * update * style * style * add doc * style * style
		
			
				
	
	
		
			346 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
			
		
		
	
	
			346 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
# Distributed Training and Inference
 | 
						|
 | 
						|
---
 | 
						|
 | 
						|
**Orca `Estimator` provides sklearn-style APIs for transparently distributed model training and inference** 
 | 
						|
 | 
						|
### 1. Estimator
 | 
						|
 | 
						|
To perform distributed training and inference, the user can first create an Orca `Estimator` from any standard (single-node) TensorFlow, Kera or PyTorch model, and then call `Estimator.fit` or `Estimator.predict`  methods (using the [data-parallel processing pipeline](./data-parallel-processing.md) as input).
 | 
						|
 | 
						|
Under the hood, the Orca `Estimator` will replicate the model on each node in the cluster, feed the data partition (generated by the data-parallel processing pipeline) on each node to the local model replica, and synchronize model parameters using various *backend* technologies (such as *Horovod*, `tf.distribute.MirroredStrategy`, `torch.distributed`, or the parameter sync layer in [*BigDL*](https://github.com/intel-analytics/BigDL)).
 | 
						|
 | 
						|
### 2. TensorFlow/Keras Estimator
 | 
						|
 | 
						|
#### 2.1 TensorFlow 1.15 and Keras 2.3
 | 
						|
 | 
						|
There are two ways to create an Estimator for TensorFlow 1.15, either from a low level computation graph or a Keras model. Examples are as follow:
 | 
						|
 | 
						|
TensorFlow Computation Graph:
 | 
						|
```python
 | 
						|
# define inputs to the graph
 | 
						|
images = tf.placeholder(dtype=tf.float32, shape=(None, 28, 28, 1))
 | 
						|
labels = tf.placeholder(dtype=tf.int32, shape=(None,))
 | 
						|
 | 
						|
# define the network and loss
 | 
						|
logits = lenet(images)
 | 
						|
loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))
 | 
						|
 | 
						|
# define a metric
 | 
						|
acc = accuracy(logits, labels)
 | 
						|
 | 
						|
# create an estimator using endpoints of the graph
 | 
						|
est = Estimator.from_graph(inputs=images,
 | 
						|
                           outputs=logits,
 | 
						|
                           labels=labels,
 | 
						|
                           loss=loss,
 | 
						|
                           optimizer=tf.train.AdamOptimizer(),
 | 
						|
                           metrics={"acc": acc})
 | 
						|
```
 | 
						|
 | 
						|
Keras Model:
 | 
						|
```python
 | 
						|
model = create_keras_lenet_model()
 | 
						|
model.compile(optimizer=keras.optimizers.RMSprop(),
 | 
						|
              loss='sparse_categorical_crossentropy',
 | 
						|
              metrics=['accuracy'])
 | 
						|
est = Estimator.from_keras(keras_model=model)
 | 
						|
```
 | 
						|
 | 
						|
Then users can perform distributed model training and inference as follows:
 | 
						|
 | 
						|
```python
 | 
						|
dataset = tfds.load(name="mnist", split="train")
 | 
						|
dataset = dataset.map(preprocess)
 | 
						|
est.fit(data=mnist_train,
 | 
						|
        batch_size=320,
 | 
						|
        epochs=max_epoch)
 | 
						|
predictions = est.predict(data=df,
 | 
						|
                          feature_cols=['image'])
 | 
						|
```
 | 
						|
The `data` argument in `fit` method can be a Spark DataFrame, an *XShards* or a `tf.data.Dataset`. The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.tf.estimator) for more details.
 | 
						|
 | 
						|
#### 2.2 TensorFlow 2.x and Keras 2.4+
 | 
						|
 | 
						|
**Using `ray` or *Horovod* backend**
 | 
						|
 | 
						|
Users can create an `Estimator` for TensorFlow 2.x from a Keras model (using a _Model Creator Function_) when the backend is
 | 
						|
`ray` (currently default for TF2) or *Horovod*. For example:
 | 
						|
 | 
						|
```python
 | 
						|
def model_creator(config):
 | 
						|
    model = create_keras_lenet_model()
 | 
						|
    model.compile(optimizer=keras.optimizers.RMSprop(),
 | 
						|
                  loss='sparse_categorical_crossentropy',
 | 
						|
                  metrics=['accuracy'])
 | 
						|
    return model
 | 
						|
est = Estimator.from_keras(model_creator=model_creator) # or backend="horovod"
 | 
						|
```
 | 
						|
 | 
						|
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
 | 
						|
 | 
						|
Then users can perform distributed model training and inference as follows:
 | 
						|
 | 
						|
```python
 | 
						|
def train_data_creator(config, batch_size):
 | 
						|
    dataset = tfds.load(name="mnist", split="train")
 | 
						|
    dataset = dataset.map(preprocess)
 | 
						|
    dataset = dataset.batch(batch_size)
 | 
						|
    return dataset
 | 
						|
stats = est.fit(data=train_data_creator,
 | 
						|
                epochs=max_epoch,
 | 
						|
                steps_per_epoch=total_size // batch_size)
 | 
						|
predictions = est.predict(data=df,
 | 
						|
                          feature_cols=['image'])
 | 
						|
```
 | 
						|
 | 
						|
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-ray-estimator) for more details.
 | 
						|
 | 
						|
**Using *spark* backend**
 | 
						|
 | 
						|
Users can create an `Estimator` for TensorFlow 2.x using the *spark* backend as follows:
 | 
						|
 | 
						|
```python
 | 
						|
def model_creator(config):
 | 
						|
    model = create_keras_lenet_model()
 | 
						|
    model.compile(**compile_args(config))
 | 
						|
    return model
 | 
						|
 | 
						|
def compile_args(config):
 | 
						|
    if "lr" in config:
 | 
						|
        lr = config["lr"]
 | 
						|
    else:
 | 
						|
        lr = 1e-2
 | 
						|
    args = {
 | 
						|
        "optimizer": keras.optimizers.SGD(lr),
 | 
						|
        "loss": "mean_squared_error",
 | 
						|
        "metrics": ["mean_squared_error"]
 | 
						|
    }
 | 
						|
    return args
 | 
						|
 | 
						|
est = Estimator.from_keras(model_creator=model_creator,
 | 
						|
                           config={"lr": 1e-2},
 | 
						|
                           workers_per_node=2,
 | 
						|
                           backend="spark",
 | 
						|
                           model_dir=model_dir)
 | 
						|
```
 | 
						|
 | 
						|
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
 | 
						|
The `model_dir` argument is required for *spark* backend, it should be a share filesystem path which can be accessed by executors for culster mode.  
 | 
						|
 | 
						|
Then users can perform distributed model training and inference as follows:
 | 
						|
 | 
						|
```python
 | 
						|
def train_data_creator(config, batch_size):
 | 
						|
    dataset = tfds.load(name="mnist", split="train")
 | 
						|
    dataset = dataset.map(preprocess)
 | 
						|
    dataset = dataset.batch(batch_size)
 | 
						|
    return dataset
 | 
						|
stats = est.fit(data=train_data_creator,
 | 
						|
                epochs=max_epoch,
 | 
						|
                steps_per_epoch=total_size // batch_size)
 | 
						|
predictions = est.predict(data=df,
 | 
						|
                          feature_cols=['image']).collect()
 | 
						|
```
 | 
						|
 | 
						|
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-spark-estimator) for more details.
 | 
						|
 | 
						|
### 3. PyTorch Estimator
 | 
						|
 | 
						|
**Using *BigDL* backend**
 | 
						|
 | 
						|
Users may create a PyTorch `Estimator` using the *Spark* backend (currently default for PyTorch) as follows:
 | 
						|
 | 
						|
```python
 | 
						|
def model_creator(config):
 | 
						|
    model = LeNet() # a torch.nn.Module
 | 
						|
    model.train()
 | 
						|
    return model
 | 
						|
 | 
						|
def optimizer_creator(model, config):
 | 
						|
    return torch.optim.Adam(model.parameters(), config["lr"])
 | 
						|
 | 
						|
est = Estimator.from_torch(model=model_creator,
 | 
						|
                           optimizer=optimizer_creator,
 | 
						|
                           loss=nn.NLLLoss(),
 | 
						|
                           config={"lr": 1e-2})
 | 
						|
```
 | 
						|
 | 
						|
Then users can perform distributed model training and inference as follows:
 | 
						|
 | 
						|
```python
 | 
						|
est.fit(data=train_loader, epochs=args.epochs)
 | 
						|
predictions = est.predict(xshards)
 | 
						|
```
 | 
						|
 | 
						|
The input to `fit` methods can be a `torch.utils.data.DataLoader`, a Spark Dataframe, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The input to `predict` methods should be a Spark Dataframe, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-spark-estimator) for more details.
 | 
						|
 | 
						|
**Using `torch.distributed` or *Horovod* backend**
 | 
						|
 | 
						|
Alternatively, users can create a PyTorch `Estimator` using `torch.distributed` or *Horovod* backend by specifying the `backend` argument to be "ray" or "horovod". In this case, the `model` and `optimizer` should be wrapped in _Creater Functions_. For example:
 | 
						|
 | 
						|
```python
 | 
						|
def model_creator(config):
 | 
						|
    model = LeNet() # a torch.nn.Module
 | 
						|
    model.train()
 | 
						|
    return model
 | 
						|
 | 
						|
def optimizer_creator(model, config):
 | 
						|
    return torch.optim.Adam(model.parameters(), config["lr"])
 | 
						|
 | 
						|
est = Estimator.from_torch(model=model_creator,
 | 
						|
                           optimizer=optimizer_creator,
 | 
						|
                           loss=nn.NLLLoss(),
 | 
						|
                           config={"lr": 1e-2},
 | 
						|
                           backend="ray") # or backend="horovod"
 | 
						|
```
 | 
						|
 | 
						|
Then users can perform distributed model training and inference as follows:
 | 
						|
 | 
						|
```python
 | 
						|
est.fit(data=train_loader_func, epochs=args.epochs)
 | 
						|
predictions = est.predict(data=df,
 | 
						|
                          feature_cols=['image'])
 | 
						|
```
 | 
						|
 | 
						|
The input to `fit` methods can be a Spark DataFrame, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The `data` argument in `predict` method can be a Spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-ray-estimator) for more details.
 | 
						|
 | 
						|
### 4. MXNet Estimator
 | 
						|
 | 
						|
The user may create a MXNet `Estimator` as follows:
 | 
						|
```python
 | 
						|
from bigdl.orca.learn.mxnet import Estimator, create_config
 | 
						|
 | 
						|
def get_model(config):
 | 
						|
    net = LeNet() # a mxnet.gluon.Block
 | 
						|
    return net
 | 
						|
 | 
						|
def get_loss(config):
 | 
						|
    return gluon.loss.SoftmaxCrossEntropyLoss()
 | 
						|
 | 
						|
config = create_config(log_interval=2, optimizer="adam",
 | 
						|
                       optimizer_params={'learning_rate': 0.02})
 | 
						|
est = Estimator.from_mxnet(config=config,
 | 
						|
                           model_creator=get_model,
 | 
						|
                           loss_creator=get_loss,
 | 
						|
                           num_workers=2)
 | 
						|
```
 | 
						|
 | 
						|
Then the user can perform distributed model training as follows:
 | 
						|
```python
 | 
						|
import numpy as np
 | 
						|
 | 
						|
def get_train_data_iter(config, kv):
 | 
						|
    train = mx.io.NDArrayIter(data_ndarray, label_ndarray,
 | 
						|
                              batch_size=config["batch_size"], shuffle=True)
 | 
						|
    return train
 | 
						|
 | 
						|
est.fit(get_train_data_iter, epochs=2)
 | 
						|
```
 | 
						|
 | 
						|
The input to `fit` methods can be an *XShards*, or a *Data Creator Function* (that returns an `MXNet DataIter/DataLoader`). See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
 | 
						|
 | 
						|
### 5. BigDL Estimator
 | 
						|
 | 
						|
The user may create a BigDL `Estimator` as follows:
 | 
						|
```python
 | 
						|
from bigdl.dllib.nn.criterion import *
 | 
						|
from bigdl.dllib.nn.layer import *
 | 
						|
from bigdl.dllib.optim.optimizer import *
 | 
						|
from bigdl.orca.learn.bigdl import Estimator
 | 
						|
 | 
						|
linear_model = Sequential().add(Linear(2, 2))
 | 
						|
mse_criterion = MSECriterion()
 | 
						|
est = Estimator.from_bigdl(model=linear_model, loss=mse_criterion, optimizer=Adam())
 | 
						|
```
 | 
						|
 | 
						|
Then the user can perform distributed model training and inference as follows:
 | 
						|
```python
 | 
						|
# read spark Dataframe
 | 
						|
df = spark.read.parquet("data.parquet")
 | 
						|
 | 
						|
# distributed model training
 | 
						|
est.fit(df, 1, batch_size=4)
 | 
						|
 | 
						|
#distributed model inference
 | 
						|
result_df = est.predict(df)
 | 
						|
```
 | 
						|
 | 
						|
The input to `fit` and `predict` methods can be a *Spark Dataframe*, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.bigdl.estimator) for more details.
 | 
						|
 | 
						|
### 6. OpenVINO Estimator
 | 
						|
 | 
						|
The user may create a OpenVINO `Estimator` as follows:
 | 
						|
```python
 | 
						|
from bigdl.orca.learn.openvino import Estimator
 | 
						|
 | 
						|
model_path = "The/file_path/to/the/OpenVINO_IR_xml_file"
 | 
						|
est = Estimator.from_openvino(model_path=model_path)
 | 
						|
```
 | 
						|
 | 
						|
Then the user can perform distributed model inference as follows:
 | 
						|
```python
 | 
						|
# ndarray
 | 
						|
input_data = np.random.random([20, 4, 3, 224, 224])
 | 
						|
result = est.predict(input_data)
 | 
						|
 | 
						|
# xshards
 | 
						|
shards = XShards.partition({"x": input_data})
 | 
						|
result_shards = est.predict(shards)
 | 
						|
```
 | 
						|
 | 
						|
The input to `predict` methods can be an *XShards*, or a *numpy array*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-openvino-estimator) for more details.
 | 
						|
 | 
						|
### 7. MPI Estimator
 | 
						|
The Orca MPI Estimator is to run distributed training job based on MPI.
 | 
						|
 | 
						|
#### Preparation:
 | 
						|
* Configure password-less ssh from the master node (the one you'll launch training from) to all other nodes.
 | 
						|
 | 
						|
* All hosts have the same working directory.
 | 
						|
* All hosts have the same Python environment in the same location.
 | 
						|
 | 
						|
#### Train
 | 
						|
Then the user may create a MPI Estimator as follows:
 | 
						|
```python
 | 
						|
from bigdl.orca.learn.mpi import MPIEstimator
 | 
						|
 | 
						|
est = MPIEstimator(model_creator=model_creator,
 | 
						|
                   optimizer_creator=optimizer_creator,
 | 
						|
                   loss_creator=None,
 | 
						|
                   metrics=None,
 | 
						|
                   scheduler_creator=None,
 | 
						|
                   config=config,
 | 
						|
                   init_func=init,  # Init the distributed environment for MPI if any
 | 
						|
                   hosts=hosts,
 | 
						|
                   workers_per_node=workers_per_node,
 | 
						|
                   env=None)                   
 | 
						|
```
 | 
						|
Then the user can perform distributed model training as follows:
 | 
						|
```python
 | 
						|
# read spark Dataframe
 | 
						|
df = spark.read.parquet("data.parquet")
 | 
						|
 | 
						|
# distributed model training
 | 
						|
est.fit(data=df, epochs=1, batch_size=4, feature_col="feature", label_cols="label")
 | 
						|
 | 
						|
```
 | 
						|
The input to `fit` methods can be an Spark Dataframe, or a callable  function to return a `torch.utils.data.DataLoader`. 
 | 
						|
 | 
						|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-mpi-mpi-estimator) for more details.
 | 
						|
 | 
						|
 |