ipex-llm/docs/readthedocs/source/doc/Orca/Overview/distributed-training-inference.md
Cengguang Zhang 745aaef5df Orca: update doc for pytorch estimator backend. (#6723)
* feat: update doc for pytorch estimator backend.

* fix: remove ray global dependency.

* rm: remove .swp file.

* fix: revert ray import fix.

* fix: replace model and optimizer with  model_creator and optimizer_creator.

* fix: delete unnecessary links.

* fix: update index.md

* fix: fix code style of quickstart and jupyter notebook.

* fix: remove criterion.

* fix: fix dataset description.

* fix: fix code style.

* fix: fix code style.

* fix: update batch size and link

* fix: update link

* fix: fix code style.

* fix: fix unnecessary code.

* fix: fix typo.

* fix: use relative path.

* fix: fix typo.

* fix: fix link.
2022-11-23 19:09:01 +08:00

306 lines
12 KiB
Markdown

# Distributed Training and Inference
---
**Orca `Estimator` provides sklearn-style APIs for transparently distributed model training and inference**
### 1. Estimator
To perform distributed training and inference, the user can first create an Orca `Estimator` from any standard (single-node) TensorFlow, Kera or PyTorch model, and then call `Estimator.fit` or `Estimator.predict` methods (using the [data-parallel processing pipeline](./data-parallel-processing.md) as input).
Under the hood, the Orca `Estimator` will replicate the model on each node in the cluster, feed the data partition (generated by the data-parallel processing pipeline) on each node to the local model replica, and synchronize model parameters using various *backend* technologies (such as *Horovod*, `tf.distribute.MirroredStrategy`, `torch.distributed`, or the parameter sync layer in [*BigDL*](https://github.com/intel-analytics/BigDL)).
### 2. TensorFlow/Keras Estimator
#### 2.1 TensorFlow 1.15 and Keras 2.3
There are two ways to create an Estimator for TensorFlow 1.15, either from a low level computation graph or a Keras model. Examples are as follow:
TensorFlow Computation Graph:
```python
# define inputs to the graph
images = tf.placeholder(dtype=tf.float32, shape=(None, 28, 28, 1))
labels = tf.placeholder(dtype=tf.int32, shape=(None,))
# define the network and loss
logits = lenet(images)
loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))
# define a metric
acc = accuracy(logits, labels)
# create an estimator using endpoints of the graph
est = Estimator.from_graph(inputs=images,
outputs=logits,
labels=labels,
loss=loss,
optimizer=tf.train.AdamOptimizer(),
metrics={"acc": acc})
```
Keras Model:
```python
model = create_keras_lenet_model()
model.compile(optimizer=keras.optimizers.RMSprop(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
est = Estimator.from_keras(keras_model=model)
```
Then users can perform distributed model training and inference as follows:
```python
dataset = tfds.load(name="mnist", split="train")
dataset = dataset.map(preprocess)
est.fit(data=mnist_train,
batch_size=320,
epochs=max_epoch)
predictions = est.predict(data=df,
feature_cols=['image'])
```
The `data` argument in `fit` method can be a Spark DataFrame, an *XShards* or a `tf.data.Dataset`. The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.tf.estimator) for more details.
#### 2.2 TensorFlow 2.x and Keras 2.4+
**Using `ray` or *Horovod* backend**
Users can create an `Estimator` for TensorFlow 2.x from a Keras model (using a _Model Creator Function_) when the backend is
`ray` (currently default for TF2) or *Horovod*. For example:
```python
def model_creator(config):
model = create_keras_lenet_model()
model.compile(optimizer=keras.optimizers.RMSprop(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
est = Estimator.from_keras(model_creator=model_creator) # or backend="horovod"
```
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
Then users can perform distributed model training and inference as follows:
```python
def train_data_creator(config, batch_size):
dataset = tfds.load(name="mnist", split="train")
dataset = dataset.map(preprocess)
dataset = dataset.batch(batch_size)
return dataset
stats = est.fit(data=train_data_creator,
epochs=max_epoch,
steps_per_epoch=total_size // batch_size)
predictions = est.predict(data=df,
feature_cols=['image'])
```
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-ray-estimator) for more details.
**Using *spark* backend**
Users can create an `Estimator` for TensorFlow 2.x using the *spark* backend as follows:
```python
def model_creator(config):
model = create_keras_lenet_model()
model.compile(**compile_args(config))
return model
def compile_args(config):
if "lr" in config:
lr = config["lr"]
else:
lr = 1e-2
args = {
"optimizer": keras.optimizers.SGD(lr),
"loss": "mean_squared_error",
"metrics": ["mean_squared_error"]
}
return args
est = Estimator.from_keras(model_creator=model_creator,
config={"lr": 1e-2},
workers_per_node=2,
backend="spark",
model_dir=model_dir)
```
The `model_creator` argument should be a function that takes a `config` dictionary and returns a compiled Keras model.
The `model_dir` argument is required for *spark* backend, it should be a share filesystem path which can be accessed by executors for culster mode.
Then users can perform distributed model training and inference as follows:
```python
def train_data_creator(config, batch_size):
dataset = tfds.load(name="mnist", split="train")
dataset = dataset.map(preprocess)
dataset = dataset.batch(batch_size)
return dataset
stats = est.fit(data=train_data_creator,
epochs=max_epoch,
steps_per_epoch=total_size // batch_size)
predictions = est.predict(data=df,
feature_cols=['image']).collect()
```
The `data` argument in `fit` method can be a spark DataFrame, an *XShards* or a *Data Creator Function* (that returns a `tf.data.Dataset`). The `data` argument in `predict` method can be a spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-tf2-tf2-spark-estimator) for more details.
### 3. PyTorch Estimator
**Using *BigDL* backend**
Users may create a PyTorch `Estimator` using the *Spark* backend (currently default for PyTorch) as follows:
```python
def model_creator(config):
model = LeNet() # a torch.nn.Module
model.train()
return model
def optimizer_creator(model, config):
return torch.optim.Adam(model.parameters(), config["lr"])
est = Estimator.from_torch(model=model_creator,
optimizer=optimizer_creator,
loss=nn.NLLLoss(),
config={"lr": 1e-2})
```
Then users can perform distributed model training and inference as follows:
```python
est.fit(data=train_loader, epochs=args.epochs)
predictions = est.predict(xshards)
```
The input to `fit` methods can be a `torch.utils.data.DataLoader`, a Spark Dataframe, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The input to `predict` methods should be a Spark Dataframe, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-spark-estimator) for more details.
**Using `torch.distributed` or *Horovod* backend**
Alternatively, users can create a PyTorch `Estimator` using `torch.distributed` or *Horovod* backend by specifying the `backend` argument to be "ray" or "horovod". In this case, the `model` and `optimizer` should be wrapped in _Creater Functions_. For example:
```python
def model_creator(config):
model = LeNet() # a torch.nn.Module
model.train()
return model
def optimizer_creator(model, config):
return torch.optim.Adam(model.parameters(), config["lr"])
est = Estimator.from_torch(model=model_creator,
optimizer=optimizer_creator,
loss=nn.NLLLoss(),
config={"lr": 1e-2},
backend="ray") # or backend="horovod"
```
Then users can perform distributed model training and inference as follows:
```python
est.fit(data=train_loader_func, epochs=args.epochs)
predictions = est.predict(data=df,
feature_cols=['image'])
```
The input to `fit` methods can be a Spark DataFrame, an *XShards*, or a *Data Creator Function* (that returns a `torch.utils.data.DataLoader`). The `data` argument in `predict` method can be a Spark DataFrame or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.md) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-pytorch-pytorch-ray-estimator) for more details.
### 4. MXNet Estimator
The user may create a MXNet `Estimator` as follows:
```python
from bigdl.orca.learn.mxnet import Estimator, create_config
def get_model(config):
net = LeNet() # a mxnet.gluon.Block
return net
def get_loss(config):
return gluon.loss.SoftmaxCrossEntropyLoss()
config = create_config(log_interval=2, optimizer="adam",
optimizer_params={'learning_rate': 0.02})
est = Estimator.from_mxnet(config=config,
model_creator=get_model,
loss_creator=get_loss,
num_workers=2)
```
Then the user can perform distributed model training as follows:
```python
import numpy as np
def get_train_data_iter(config, kv):
train = mx.io.NDArrayIter(data_ndarray, label_ndarray,
batch_size=config["batch_size"], shuffle=True)
return train
est.fit(get_train_data_iter, epochs=2)
```
The input to `fit` methods can be an *XShards*, or a *Data Creator Function* (that returns an `MXNet DataIter/DataLoader`). See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
### 5. BigDL Estimator
The user may create a BigDL `Estimator` as follows:
```python
from bigdl.dllib.nn.criterion import *
from bigdl.dllib.nn.layer import *
from bigdl.dllib.optim.optimizer import *
from bigdl.orca.learn.bigdl import Estimator
linear_model = Sequential().add(Linear(2, 2))
mse_criterion = MSECriterion()
est = Estimator.from_bigdl(model=linear_model, loss=mse_criterion, optimizer=Adam())
```
Then the user can perform distributed model training and inference as follows:
```python
# read spark Dataframe
df = spark.read.parquet("data.parquet")
# distributed model training
est.fit(df, 1, batch_size=4)
#distributed model inference
result_df = est.predict(df)
```
The input to `fit` and `predict` methods can be a *Spark Dataframe*, or an *XShards*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#module-bigdl.orca.learn.bigdl.estimator) for more details.
### 6. OpenVINO Estimator
The user may create a OpenVINO `Estimator` as follows:
```python
from bigdl.orca.learn.openvino import Estimator
model_path = "The/file_path/to/the/OpenVINO_IR_xml_file"
est = Estimator.from_openvino(model_path=model_path)
```
Then the user can perform distributed model inference as follows:
```python
# ndarray
input_data = np.random.random([20, 4, 3, 224, 224])
result = est.predict(input_data)
# xshards
shards = XShards.partition({"x": input_data})
result_shards = est.predict(shards)
```
The input to `predict` methods can be an *XShards*, or a *numpy array*. See the *data-parallel processing pipeline* [page](./data-parallel-processing.html) for more details.
View the related [Python API doc](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Orca/orca.html#orca-learn-openvino-estimator) for more details.