ipex-llm/docs/readthedocs/source/doc/UseCase/xshards-pandas.md
Yuwen Hu 0407ebf5c3 [Doc] Remove manually-added bold styles for titles (#6215)
* Remove manually-added bold style for titles in [User guide] section

* Fix failed relative links in windows user guide

* Remove manually-added bold style for titles in [Orca] section

* Fix failed relative links & title bold fix in Nano 5 min

* Remove manually-added bold style for titles in [Nano] section

* Remove manually-added bold style for titles in [DLlib] section

* Remove manually-added bold style for titles in [Chronos] section

* Remove manually-added bold style for titles in Developer guide

* Remove manually-added bold title style for all other not-included md files in docs/readthedocs/source/doc folder

* Fix based on comments
2022-10-20 13:48:22 +08:00

5.3 KiB

Use Distributed Pandas for Deep Learning


Run in Google Colab  View source on GitHub


In this guide we will describe how to use XShards to scale-out Pandas data processing for distribtued deep learning.

1. Read input data into XShards of Pandas DataFrame

First, read CVS, JSON or Parquet files into an XShards of Pandas Dataframe (i.e., a distributed and sharded dataset where each partition contained a Pandas Dataframe), as shown below:

from bigdl.orca.data.pandas import read_csv
full_data = read_csv(new_rating_files, sep=':', header=None,
                     names=['user', 'item', 'label'], usecols=[0, 1, 2],
                     dtype={0: np.int32, 1: np.int32, 2: np.int32})

2. Process Pandas Dataframes using XShards

Next, use XShards to efficiently process large-size Pandas Dataframes in a distributed and data-parallel fashion. You may run standard Python code on each partition in a data-parallel fashion using XShards.transform_shard, as shown below:

# update label starting from 0. That's because ratings go from 1 to 5, while the matrix columns go from 0 to 4
def update_label(df):
  df['label'] = df['label'] - 1
  return df

full_data = full_data.transform_shard(update_label)
from sklearn.model_selection import train_test_split

# split to train/test dataset
def split_train_test(data):
  train, test = train_test_split(data, test_size=0.2, random_state=100)
  return train, test

train_data, test_data = full_data.transform_shard(split_train_test).split()

3. Define NCF model

Define the NCF model using TensorFlow 1.15 APIs:

import tensorflow as tf

class NCF(object):
    def __init__(self, embed_size, user_size, item_size):
        self.user = tf.placeholder(dtype=tf.int32, shape=(None,))
        self.item = tf.placeholder(dtype=tf.int32, shape=(None,))
        self.label = tf.placeholder(dtype=tf.int32, shape=(None,))
        
        with tf.name_scope("GMF"):
            user_embed_GMF = tf.contrib.layers.embed_sequence(self.user, vocab_size=user_size + 1,
                                                              embed_dim=embed_size)
            item_embed_GMF = tf.contrib.layers.embed_sequence(self.item, vocab_size=item_size + 1,
                                                              embed_dim=embed_size)
            GMF = tf.multiply(user_embed_GMF, item_embed_GMF)

        with tf.name_scope("MLP"):
            user_embed_MLP = tf.contrib.layers.embed_sequence(self.user, vocab_size=user_size + 1,
                                                              embed_dim=embed_size)
            item_embed_MLP = tf.contrib.layers.embed_sequence(self.item, vocab_size=item_size + 1,
                                                              embed_dim=embed_size)
            interaction = tf.concat([user_embed_MLP, item_embed_MLP], axis=-1)
            layer1_MLP = tf.layers.dense(inputs=interaction, units=embed_size * 2)
            layer1_MLP = tf.layers.dropout(layer1_MLP, rate=0.2)
            layer2_MLP = tf.layers.dense(inputs=layer1_MLP, units=embed_size)
            layer2_MLP = tf.layers.dropout(layer2_MLP, rate=0.2)
            layer3_MLP = tf.layers.dense(inputs=layer2_MLP, units=embed_size // 2)
            layer3_MLP = tf.layers.dropout(layer3_MLP, rate=0.2)

        # Concate the two parts together
        with tf.name_scope("concatenation"):
            concatenation = tf.concat([GMF, layer3_MLP], axis=-1)
            self.logits = tf.layers.dense(inputs=concatenation, units=5)
            self.logits_softmax = tf.nn.softmax(self.logits)
            self.class_number = tf.argmax(self.logits_softmax, 1)

        with tf.name_scope("loss"):
            self.loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
                labels=self.label, logits=self.logits, name='loss'))

        with tf.name_scope("optimzation"):
            self.optim = tf.train.AdamOptimizer(1e-3, name='Adam')
            self.optimizer = self.optim.minimize(self.loss)

embedding_size=16
model = NCF(embedding_size, max_user_id, max_item_id)

4. Fit with Orca Estimator

Finally, directly run distributed model training/inference on the XShards of Pandas DataFrames.

from bigdl.orca.learn.tf.estimator import Estimator

# create an Estimator.
estimator = Estimator.from_graph(
            inputs=[model.user, model.item], # the model accept two inputs and one label
            outputs=[model.class_number],
            labels=[model.label],
            loss=model.loss,
            optimizer=model.optim,
            model_dir=model_dir,
            metrics={"loss": model.loss})

# fit the Estimator
estimator.fit(data=train_data,
              batch_size=1280,
              epochs=1,
              feature_cols=['user', 'item'], # specifies which column(s) to be used as inputs
              label_cols=['label'], # specifies which column(s) to be used as labels
              validation_data=test_data)