* Remove manually-added bold style for titles in [User guide] section * Fix failed relative links in windows user guide * Remove manually-added bold style for titles in [Orca] section * Fix failed relative links & title bold fix in Nano 5 min * Remove manually-added bold style for titles in [Nano] section * Remove manually-added bold style for titles in [DLlib] section * Remove manually-added bold style for titles in [Chronos] section * Remove manually-added bold style for titles in Developer guide * Remove manually-added bold title style for all other not-included md files in docs/readthedocs/source/doc folder * Fix based on comments
		
			
				
	
	
	
	
		
			4.5 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	
			4.5 KiB
		
	
	
	
	
	
	
	
Use Spark Dataframe for Deep Learning
Run in Google Colab  
View source on GitHub
In this guide we will describe how to use Apache Spark Dataframes to scale-out data processing for distribtued deep learning.
The dataset used in this guide is movielens-1M, which contains 1 million ratings of 5 levels from 6000 users on 4000 movies. We will read the data into Spark Dataframe and directly use the Spark Dataframe as the input to the distributed training.
1. Read input data into Spark DataFrame
First, read the input data into Spark Dataframes.
from bigdl.orca import OrcaContext
spark = OrcaContext.get_spark_session()
# read csv with specifying column names
df = spark.read.csv(new_rating_files, sep=':', inferSchema=True).toDF(
  "user", "item", "label", "timestamp")
2. Process data using Spark Dataframe
Next, process the data using Spark Dataframe operations.
# update label starting from 0. That's because ratings go from 1 to 5, while the matrix column index goes from 0 to 4
df = df.withColumn('label', df.label-1)
# split to train/test dataset
train_data, test_data = df.randomSplit([0.8, 0.2], 100)
3. Define NCF model
This example defines NCF model in the Creator Function using TensroFlow 2 APIs as follows.
from tensorflow import keras
import tensorflow as tf
def model_creator(config):
    embedding_size=16
    user = keras.layers.Input(dtype=tf.int32, shape=(None,))
    item = keras.layers.Input(dtype=tf.int32, shape=(None,))
    label = keras.layers.Input(dtype=tf.int32, shape=(None,))
    with tf.name_scope("GMF"):
        user_embed_GMF = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
        item_embed_GMF = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
        GMF = keras.layers.Multiply()([user_embed_GMF, item_embed_GMF])
    with tf.name_scope("MLP"):
        user_embed_MLP = keras.layers.Embedding(max_user_id + 1, embedding_size)(user)
        item_embed_MLP = keras.layers.Embedding(max_item_id + 1, embedding_size)(item)
        interaction = concat([user_embed_MLP, item_embed_MLP], axis=-1)
        layer1_MLP = keras.layers.Dense(units=embedding_size * 2, activation='relu')(interaction)
        layer1_MLP = keras.layers.Dropout(rate=0.2)(layer1_MLP)
        layer2_MLP = keras.layers.Dense(units=embedding_size, activation='relu')(layer1_MLP)
        layer2_MLP = keras.layers.Dropout(rate=0.2)(layer2_MLP)
        layer3_MLP = keras.layers.Dense(units=embedding_size // 2, activation='relu')(layer2_MLP)
        layer3_MLP = keras.layers.Dropout(rate=0.2)(layer3_MLP)
    # Concate the two parts together
    with tf.name_scope("concatenation"):
        concatenation = tf.concat([GMF, layer3_MLP], axis=-1)
        outputs = keras.layers.Dense(units=5, activation='softmax')(concatenation)
    model = keras.Model(inputs=[user, item], outputs=outputs)
    model.compile(optimizer="adam",
                  loss="sparse_categorical_crossentropy",
                  metrics=['accuracy'])
    return model
4. Fit with Orca Estimator
Finally, run distributed model training/inference on the Spark Dataframes directly.
from bigdl.orca.learn.tf2 import Estimator
# create an Estimator
est = Estimator.from_keras(model_creator=model_creator) # the model accept two inputs and one label
# fit with Estimator
stats = est.fit(train_data,
                epochs=epochs,
                batch_size=batch_size,
                feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
                label_cols=['label'], # specifies which column(s) to be used as labels
                steps_per_epoch=800000 // batch_size,
                validation_data=test_data,
                validation_steps=200000 // batch_size)
checkpoint_path = os.path.join(model_dir, "NCF.ckpt")
est.save(checkpoint_path)
# evaluate with Estimator
stats = est.evaluate(test_data,
                     feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
                     label_cols=['label'], # specifies which column(s) to be used as labels
                     num_steps=100000 // batch_size)
est.shutdown()
print(stats)