111 lines
4.5 KiB
Markdown
111 lines
4.5 KiB
Markdown
# Use Spark DataFrames for Deep Learning
|
|
|
|
---
|
|
|
|
[Run in Google Colab](https://colab.research.google.com/github/intel-analytics/BigDL/blob/main/python/orca/colab-notebook/quickstart/ncf_dataframe.ipynb) [View source on GitHub](https://github.com/intel-analytics/BigDL/blob/main/python/orca/colab-notebook/quickstart/ncf_dataframe.ipynb)
|
|
|
|
---
|
|
|
|
**In this guide we will describe how to use Apache Spark Dataframes to scale-out data processing for distributed deep learning.**
|
|
|
|
The dataset used in this guide is [movielens-1M](https://grouplens.org/datasets/movielens/1m/), which contains 1 million ratings of 5 levels from 6000 users on 4000 movies. We will read the data into Spark Dataframe and directly use the Spark Dataframe as the input to the distributed training.
|
|
|
|
### 1. Read input data into Spark DataFrame
|
|
|
|
First, read the input data into Spark Dataframes.
|
|
|
|
```python
|
|
from bigdl.orca import OrcaContext
|
|
|
|
spark = OrcaContext.get_spark_session()
|
|
# read csv with specifying column names
|
|
df = spark.read.csv(new_rating_files, sep=':', inferSchema=True).toDF(
|
|
"user", "item", "label", "timestamp")
|
|
```
|
|
|
|
### 2. Process data using Spark Dataframe
|
|
|
|
Next, process the data using Spark Dataframe operations.
|
|
|
|
```python
|
|
# update label starting from 0. That's because ratings go from 1 to 5, while the matrix column index goes from 0 to 4
|
|
df = df.withColumn('label', df.label-1)
|
|
|
|
# split to train/test dataset
|
|
train_data, test_data = df.randomSplit([0.8, 0.2], 100)
|
|
```
|
|
|
|
### 3. Define NCF model
|
|
|
|
This example defines NCF model in the _Creator Function_ using TensorFlow 2 APIs as follows.
|
|
|
|
```python
|
|
from tensorflow import keras
|
|
import tensorflow as tf
|
|
|
|
def model_creator(config):
|
|
embedding_size=16
|
|
user = keras.layers.Input(dtype=tf.int32, shape=(None,))
|
|
item = keras.layers.Input(dtype=tf.int32, shape=(None,))
|
|
label = keras.layers.Input(dtype=tf.int32, shape=(None,))
|
|
|
|
with tf.name_scope("GMF"):
|
|
user_embed_GMF = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
|
|
item_embed_GMF = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
|
|
GMF = keras.layers.Multiply()([user_embed_GMF, item_embed_GMF])
|
|
|
|
with tf.name_scope("MLP"):
|
|
user_embed_MLP = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
|
|
item_embed_MLP = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
|
|
interaction = concat([user_embed_MLP, item_embed_MLP], axis=-1)
|
|
layer1_MLP = keras.layers.Dense(units=embedding_size * 2, activation='relu')(interaction)
|
|
layer1_MLP = keras.layers.Dropout(rate=0.2)(layer1_MLP)
|
|
layer2_MLP = keras.layers.Dense(units=embedding_size, activation='relu')(layer1_MLP)
|
|
layer2_MLP = keras.layers.Dropout(rate=0.2)(layer2_MLP)
|
|
layer3_MLP = keras.layers.Dense(units=embedding_size // 2, activation='relu')(layer2_MLP)
|
|
layer3_MLP = keras.layers.Dropout(rate=0.2)(layer3_MLP)
|
|
|
|
# Concate the two parts together
|
|
with tf.name_scope("concatenation"):
|
|
concatenation = tf.concat([GMF, layer3_MLP], axis=-1)
|
|
outputs = keras.layers.Dense(units=5, activation='softmax')(concatenation)
|
|
|
|
model = keras.Model(inputs=[user, item], outputs=outputs)
|
|
model.compile(optimizer="adam",
|
|
loss="sparse_categorical_crossentropy",
|
|
metrics=['accuracy'])
|
|
return model
|
|
```
|
|
|
|
### 4. Fit with Orca Estimator
|
|
|
|
Finally, run distributed model training/inference on the Spark Dataframes directly.
|
|
|
|
```python
|
|
from bigdl.orca.learn.tf2 import Estimator
|
|
|
|
# create an Estimator
|
|
est = Estimator.from_keras(model_creator=model_creator) # the model accept two inputs and one label
|
|
|
|
# fit with Estimator
|
|
stats = est.fit(train_data,
|
|
epochs=epochs,
|
|
batch_size=batch_size,
|
|
feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
|
|
label_cols=['label'], # specifies which column(s) to be used as labels
|
|
steps_per_epoch=800000 // batch_size,
|
|
validation_data=test_data,
|
|
validation_steps=200000 // batch_size)
|
|
|
|
checkpoint_path = os.path.join(model_dir, "NCF.ckpt")
|
|
est.save(checkpoint_path)
|
|
|
|
# evaluate with Estimator
|
|
stats = est.evaluate(test_data,
|
|
feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
|
|
label_cols=['label'], # specifies which column(s) to be used as labels
|
|
num_steps=100000 // batch_size)
|
|
est.shutdown()
|
|
print(stats)
|
|
```
|
|
|