ipex-llm/docs/readthedocs/source/doc/Orca/Howto/spark-dataframe.md
Cengguang Zhang 916fdecd27 Orca: Document polishing (#6382)
* fix: delete redundant quick examples.

* feat: add How-to Guides with use cases.

* fix: _toc.yml

* fix: fix typo.

* fix: fix typo and file location.

* fix: add quickstarts to _toc.yml
2022-11-04 15:02:37 +08:00

111 lines
4.5 KiB
Markdown

# Use Spark DataFrames for Deep Learning
---
![](../../../image/colab_logo_32px.png)[Run in Google Colab](https://colab.research.google.com/github/intel-analytics/BigDL/blob/branch-2.0/python/orca/colab-notebook/quickstart/ncf_dataframe.ipynb)  ![](../../../image/GitHub-Mark-32px.png)[View source on GitHub](https://github.com/intel-analytics/BigDL/blob/branch-2.0/python/orca/colab-notebook/quickstart/ncf_dataframe.ipynb)
---
**In this guide we will describe how to use Apache Spark Dataframes to scale-out data processing for distributed deep learning.**
The dataset used in this guide is [movielens-1M](https://grouplens.org/datasets/movielens/1m/), which contains 1 million ratings of 5 levels from 6000 users on 4000 movies. We will read the data into Spark Dataframe and directly use the Spark Dataframe as the input to the distributed training.
### 1. Read input data into Spark DataFrame
First, read the input data into Spark Dataframes.
```python
from bigdl.orca import OrcaContext
spark = OrcaContext.get_spark_session()
# read csv with specifying column names
df = spark.read.csv(new_rating_files, sep=':', inferSchema=True).toDF(
"user", "item", "label", "timestamp")
```
### 2. Process data using Spark Dataframe
Next, process the data using Spark Dataframe operations.
```python
# update label starting from 0. That's because ratings go from 1 to 5, while the matrix column index goes from 0 to 4
df = df.withColumn('label', df.label-1)
# split to train/test dataset
train_data, test_data = df.randomSplit([0.8, 0.2], 100)
```
### 3. Define NCF model
This example defines NCF model in the _Creator Function_ using TensroFlow 2 APIs as follows.
```python
from tensorflow import keras
import tensorflow as tf
def model_creator(config):
embedding_size=16
user = keras.layers.Input(dtype=tf.int32, shape=(None,))
item = keras.layers.Input(dtype=tf.int32, shape=(None,))
label = keras.layers.Input(dtype=tf.int32, shape=(None,))
with tf.name_scope("GMF"):
user_embed_GMF = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
item_embed_GMF = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
GMF = keras.layers.Multiply()([user_embed_GMF, item_embed_GMF])
with tf.name_scope("MLP"):
user_embed_MLP = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
item_embed_MLP = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
interaction = concat([user_embed_MLP, item_embed_MLP], axis=-1)
layer1_MLP = keras.layers.Dense(units=embedding_size * 2, activation='relu')(interaction)
layer1_MLP = keras.layers.Dropout(rate=0.2)(layer1_MLP)
layer2_MLP = keras.layers.Dense(units=embedding_size, activation='relu')(layer1_MLP)
layer2_MLP = keras.layers.Dropout(rate=0.2)(layer2_MLP)
layer3_MLP = keras.layers.Dense(units=embedding_size // 2, activation='relu')(layer2_MLP)
layer3_MLP = keras.layers.Dropout(rate=0.2)(layer3_MLP)
# Concate the two parts together
with tf.name_scope("concatenation"):
concatenation = tf.concat([GMF, layer3_MLP], axis=-1)
outputs = keras.layers.Dense(units=5, activation='softmax')(concatenation)
model = keras.Model(inputs=[user, item], outputs=outputs)
model.compile(optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=['accuracy'])
return model
```
### 4. Fit with Orca Estimator
Finally, run distributed model training/inference on the Spark Dataframes directly.
```python
from bigdl.orca.learn.tf2 import Estimator
# create an Estimator
est = Estimator.from_keras(model_creator=model_creator) # the model accept two inputs and one label
# fit with Estimator
stats = est.fit(train_data,
epochs=epochs,
batch_size=batch_size,
feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
label_cols=['label'], # specifies which column(s) to be used as labels
steps_per_epoch=800000 // batch_size,
validation_data=test_data,
validation_steps=200000 // batch_size)
checkpoint_path = os.path.join(model_dir, "NCF.ckpt")
est.save(checkpoint_path)
# evaluate with Estimator
stats = est.evaluate(test_data,
feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
label_cols=['label'], # specifies which column(s) to be used as labels
num_steps=100000 // batch_size)
est.shutdown()
print(stats)
```