* rename backend name from tf2 to ray * rename in doc * rename in docstring * manually * rename test_tf2estimator_tf2_backend.py -> test_tf2estimator_ray_backend.py * change tf2 in multi_agent_two_trainers.py back
306 lines
12 KiB
Markdown
306 lines
12 KiB
Markdown
# Distributed Training and Inference
|
|
|
|
---
|
|
|
|
**Orca `Estimator` provides sklearn-style APIs for transparently distributed model training and inference**
|
|
|
|
### **1. Estimator**
|
|
|
|
To perform distributed training and inference, the user can first create an Orca `Estimator` from any standard (single-node) TensorFlow, Kera or PyTorch model, and then call `Estimator.fit` or `Estimator.predict` methods (using the [data-parallel processing pipeline](./data-parallel-processing.md) as input).
|
|
|
|
Under the hood, the Orca `Estimator` will replicate the model on each node in the cluster, feed the data partition (generated by the data-parallel processing pipeline) on each node to the local model replica, and synchronize model parameters using various *backend* technologies (such as *Horovod*, `tf.distribute.MirroredStrategy`, `torch.distributed`, or the parameter sync layer in [*BigDL*](https://github.com/intel-analytics/BigDL)).
|
|
|
|
### **2. TensorFlow/Keras Estimator**
|
|
|
|
#### **2.1 TensorFlow 1.15 and Keras 2.3**
|
|
|
|
There are two ways to create an Estimator for TensorFlow 1.15, either from a low level computation graph or a Keras model. Examples are as follow:
|
|
|
|
TensorFlow Computation Graph:
|
|
```python
|
|
# define inputs to the graph
|
|
images = tf.placeholder(dtype=tf.float32, shape=(None, 28, 28, 1))
|
|
labels = tf.placeholder(dtype=tf.int32, shape=(None,))
|
|
|
|
# define the network and loss
|
|
logits = lenet(images)
|
|
loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))
|
|
|
|
# define a metric
|
|
acc = accuracy(logits, labels)
|
|
|
|
# create an estimator using endpoints of the graph
|
|
est = Estimator.from_graph(inputs=images,
|
|
outputs=logits,
|
|
labels=labels,
|
|
loss=loss,
|
|
optimizer=tf.train.AdamOptimizer(),
|
|
metrics={"acc": acc})
|
|
```
|
|
|
|
Keras Model:
|
|
```python
|
|
model = create_keras_lenet_model()
|
|
model.compile(optimizer=keras.optimizers.RMSprop(),
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy'])
|
|
est = Estimator.from_keras(keras_model=model)
|
|
```
|
|
|
|
Then users can perform distributed model training and inference as follows:
|
|
|
|
```python
|
|
dataset = tfds.load(name="mnist", split="train")
|
|
dataset = dataset.map(preprocess)
|
|
est.fit(data=mnist_train,
|
|
batch_size=320,
|
|
epochs=max_epoch)
|
|
predictions = est.predict(data=df,
|
|
feature_cols=['image'])
|
|
```
|
|
The `data` argument in `fit` method can be a Spark DataFrame, an *XShards* or a `tf.data.Dataset`. The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.tf.estimator) for more details.
|
|
|
|
#### **2.2 TensorFlow 2.x and Keras 2.4+**
|
|
|
|
**Using `ray` or *Horovod* backend**
|
|
|
|
Users can create an `Estimator` for TensorFlow 2.x from a Keras model (using a _Model Creator Function_) when the backend is
|
|
`ray` (currently default for TF2) or *Horovod*. For example:
|
|
|
|
```python
|
|
def model_creator(config):
|
|
model = create_keras_lenet_model()
|
|
model.compile(optimizer=keras.optimizers.RMSprop(),
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy'])
|
|
return model
|
|
est = Estimator.from_keras(model_creator=model_creator) # or backend="horovod"
|
|
```
|
|
|
|
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
|
|
|
|
Then users can perform distributed model training and inference as follows:
|
|
|
|
```python
|
|
def train_data_creator(config, batch_size):
|
|
dataset = tfds.load(name="mnist", split="train")
|
|
dataset = dataset.map(preprocess)
|
|
dataset = dataset.batch(batch_size)
|
|
return dataset
|
|
stats = est.fit(data=train_data_creator,
|
|
epochs=max_epoch,
|
|
steps_per_epoch=total_size // batch_size)
|
|
predictions = est.predict(data=df,
|
|
feature_cols=['image'])
|
|
```
|
|
|
|
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-ray-estimator) for more details.
|
|
|
|
**Using *spark* backend**
|
|
|
|
Users can create an `Estimator` for TensorFlow 2.x using the *spark* backend as follows:
|
|
|
|
```python
|
|
def model_creator(config):
|
|
model = create_keras_lenet_model()
|
|
model.compile(**compile_args(config))
|
|
return model
|
|
|
|
def compile_args(config):
|
|
if "lr" in config:
|
|
lr = config["lr"]
|
|
else:
|
|
lr = 1e-2
|
|
args = {
|
|
"optimizer": keras.optimizers.SGD(lr),
|
|
"loss": "mean_squared_error",
|
|
"metrics": ["mean_squared_error"]
|
|
}
|
|
return args
|
|
|
|
est = Estimator.from_keras(model_creator=model_creator,
|
|
config={"lr": 1e-2},
|
|
workers_per_node=2,
|
|
backend="spark",
|
|
model_dir=model_dir)
|
|
```
|
|
|
|
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
|
|
The `model_dir` argument is required for *spark* backend, it should be a share filesystem path which can be accessed by executors for culster mode.
|
|
|
|
Then users can perform distributed model training and inference as follows:
|
|
|
|
```python
|
|
def train_data_creator(config, batch_size):
|
|
dataset = tfds.load(name="mnist", split="train")
|
|
dataset = dataset.map(preprocess)
|
|
dataset = dataset.batch(batch_size)
|
|
return dataset
|
|
stats = est.fit(data=train_data_creator,
|
|
epochs=max_epoch,
|
|
steps_per_epoch=total_size // batch_size)
|
|
predictions = est.predict(data=df,
|
|
feature_cols=['image']).collect()
|
|
```
|
|
|
|
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-spark-estimator) for more details.
|
|
|
|
***For more details, view the distributed TensorFlow training/inference [page]()<TODO: link to be added>.***
|
|
|
|
### **3. PyTorch Estimator**
|
|
|
|
**Using *BigDL* backend**
|
|
|
|
Users may create a PyTorch `Estimator` using the *BigDL* backend (currently default for PyTorch) as follows:
|
|
|
|
```python
|
|
model = LeNet() # a torch.nn.Module
|
|
model.train()
|
|
criterion = nn.NLLLoss()
|
|
|
|
adam = torch.optim.Adam(model.parameters(), args.lr)
|
|
est = Estimator.from_torch(model=model, optimizer=adam, loss=criterion)
|
|
```
|
|
|
|
Then users can perform distributed model training and inference as follows:
|
|
|
|
```python
|
|
est.fit(data=train_loader, epochs=args.epochs)
|
|
predictions = est.predict(xshards)
|
|
```
|
|
|
|
The input to `fit` methods can be a `torch.utils.data.DataLoader`, a Spark Dataframe, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The input to `predict` methods should be a Spark Dataframe, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-spark-estimator) for more details.
|
|
|
|
**Using `torch.distributed` or *Horovod* backend**
|
|
|
|
Alternatively, users can create a PyTorch `Estimator` using `torch.distributed` or *Horovod* backend by specifying the `backend` argument to be "ray" or "horovod". In this case, the `model` and `optimizer` should be wrapped in _Creater Functions_. For example:
|
|
|
|
```python
|
|
def model_creator(config):
|
|
model = LeNet() # a torch.nn.Module
|
|
model.train()
|
|
return model
|
|
|
|
def optimizer_creator(model, config):
|
|
return torch.optim.Adam(model.parameters(), config["lr"])
|
|
|
|
est = Estimator.from_torch(model=model,
|
|
optimizer=optimizer_creator,
|
|
loss=nn.NLLLoss(),
|
|
config={"lr": 1e-2},
|
|
backend="ray") # or backend="horovod"
|
|
```
|
|
|
|
Then users can perform distributed model training and inference as follows:
|
|
|
|
```python
|
|
est.fit(data=train_loader_func, epochs=args.epochs)
|
|
predictions = est.predict(data=df,
|
|
feature_cols=['image'])
|
|
```
|
|
|
|
The input to `fit` methods can be a Spark DataFrame, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The `data` argument in `predict` method can be a Spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-ray-estimator) for more details.
|
|
|
|
***For more details, view the distributed PyTorch training/inference [page]()<TODO: link to be added>.***
|
|
|
|
### **4. MXNet Estimator**
|
|
|
|
The user may create a MXNet `Estimator` as follows:
|
|
```python
|
|
from bigdl.orca.learn.mxnet import Estimator, create_config
|
|
|
|
def get_model(config):
|
|
net = LeNet() # a mxnet.gluon.Block
|
|
return net
|
|
|
|
def get_loss(config):
|
|
return gluon.loss.SoftmaxCrossEntropyLoss()
|
|
|
|
config = create_config(log_interval=2, optimizer="adam",
|
|
optimizer_params={'learning_rate': 0.02})
|
|
est = Estimator.from_mxnet(config=config,
|
|
model_creator=get_model,
|
|
loss_creator=get_loss,
|
|
num_workers=2)
|
|
```
|
|
|
|
Then the user can perform distributed model training as follows:
|
|
```python
|
|
import numpy as np
|
|
|
|
def get_train_data_iter(config, kv):
|
|
train = mx.io.NDArrayIter(data_ndarray, label_ndarray,
|
|
batch_size=config["batch_size"], shuffle=True)
|
|
return train
|
|
|
|
est.fit(get_train_data_iter, epochs=2)
|
|
```
|
|
|
|
The input to `fit` methods can be an *XShards*, or a *Data Creator Function* (that returns an `MXNet DataIter/DataLoader`). See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
|
|
|
|
View the related [Python API doc]()<TODO: link to be added> for more details.
|
|
|
|
### **5. BigDL Estimator**
|
|
|
|
The user may create a BigDL `Estimator` as follows:
|
|
```python
|
|
from bigdl.dllib.nn.criterion import *
|
|
from bigdl.dllib.nn.layer import *
|
|
from bigdl.dllib.optim.optimizer import *
|
|
from bigdl.orca.learn.bigdl import Estimator
|
|
|
|
linear_model = Sequential().add(Linear(2, 2))
|
|
mse_criterion = MSECriterion()
|
|
est = Estimator.from_bigdl(model=linear_model, loss=mse_criterion, optimizer=Adam())
|
|
```
|
|
|
|
Then the user can perform distributed model training and inference as follows:
|
|
```python
|
|
# read spark Dataframe
|
|
df = spark.read.parquet("data.parquet")
|
|
|
|
# distributed model training
|
|
est.fit(df, 1, batch_size=4)
|
|
|
|
#distributed model inference
|
|
result_df = est.predict(df)
|
|
```
|
|
|
|
The input to `fit` and `predict` methods can be a *Spark Dataframe*, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.bigdl.estimator) for more details.
|
|
|
|
### **6. OpenVINO Estimator**
|
|
|
|
The user may create a OpenVINO `Estimator` as follows:
|
|
```python
|
|
from bigdl.orca.learn.openvino import Estimator
|
|
|
|
model_path = "The/file_path/to/the/OpenVINO_IR_xml_file"
|
|
est = Estimator.from_openvino(model_path=model_path)
|
|
```
|
|
|
|
Then the user can perform distributed model inference as follows:
|
|
```python
|
|
# ndarray
|
|
input_data = np.random.random([20, 4, 3, 224, 224])
|
|
result = est.predict(input_data)
|
|
|
|
# xshards
|
|
shards = XShards.partition({"x": input_data})
|
|
result_shards = est.predict(shards)
|
|
```
|
|
|
|
The input to `predict` methods can be an *XShards*, or a *numpy array*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
|
|
|
|
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-openvino-estimator) for more details.
|