update databricks doc (#6035)
Co-authored-by: Zhou <jian.zhou@intel.com>
This commit is contained in:
		
							parent
							
								
									d262848421
								
							
						
					
					
						commit
						936f8d2adb
					
				
					 1 changed files with 8 additions and 128 deletions
				
			
		| 
						 | 
					@ -27,8 +27,8 @@ init_script = """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# install other necessary libraries, here we install libraries needed in this tutorial
 | 
					# install other necessary libraries, here we install libraries needed in this tutorial
 | 
				
			||||||
/databricks/python/bin/pip install tensorflow==2.9.1
 | 
					/databricks/python/bin/pip install tensorflow==2.9.1
 | 
				
			||||||
/databricks/python/bin/pip install pyarrow==8.0.0
 | 
					/databricks/python/bin/pip install tqdm
 | 
				
			||||||
/databricks/python/bin/pip install psutil
 | 
					/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# copy bigdl jars to databricks
 | 
					# copy bigdl jars to databricks
 | 
				
			||||||
cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars
 | 
					cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars
 | 
				
			||||||
| 
						 | 
					@ -54,8 +54,8 @@ Create a file **init.sh**(or any other filename) in your computer, the file cont
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# install other necessary libraries, here we install libraries needed in this tutorial
 | 
					# install other necessary libraries, here we install libraries needed in this tutorial
 | 
				
			||||||
/databricks/python/bin/pip install tensorflow==2.9.1
 | 
					/databricks/python/bin/pip install tensorflow==2.9.1
 | 
				
			||||||
/databricks/python/bin/pip install pyarrow==8.0.0
 | 
					/databricks/python/bin/pip install tqdm
 | 
				
			||||||
/databricks/python/bin/pip install psutil
 | 
					/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# copy bigdl jars to databricks
 | 
					# copy bigdl jars to databricks
 | 
				
			||||||
cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars
 | 
					cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars
 | 
				
			||||||
| 
						 | 
					@ -103,132 +103,12 @@ Output on Databricks:
 | 
				
			||||||
 | 
					
 | 
				
			||||||

 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
**Run ncf_train example on Databricks**
 | 
					**Run Examples**
 | 
				
			||||||
 | 
					
 | 
				
			||||||
Create a notebook and run the following example. Note that to make things simple, we are just generating some dummy data for this example.
 | 
					- [Keras example on Databricks](https://github.com/intel-analytics/BigDL/blob/main/python/orca/tutorial/databricks/tf_keras_ncf.ipynb)
 | 
				
			||||||
 | 
					- [Pytorch example on Databricks](https://github.com/intel-analytics/BigDL/blob/main/python/orca/tutorial/databricks/pytorch_fashion_mnist.ipynb)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
```python
 | 
					> Note that if you want to save model to DBFS, or load model from DBFS, the save/load path should be the **File API Format** on Databricks, which means your save/load path should start with `/dbfs`.
 | 
				
			||||||
import math
 | 
					 | 
				
			||||||
import argparse
 | 
					 | 
				
			||||||
import os
 | 
					 | 
				
			||||||
import random
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from bigdl.orca import init_orca_context, stop_orca_context, OrcaContext
 | 
					 | 
				
			||||||
from bigdl.orca.learn.tf2.estimator import Estimator
 | 
					 | 
				
			||||||
from pyspark.sql.types import StructType, StructField, IntegerType
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def build_model(num_users, num_items, class_num, layers=[20, 10], include_mf=True, mf_embed=20):
 | 
					 | 
				
			||||||
    import tensorflow as tf
 | 
					 | 
				
			||||||
    from tensorflow.keras.layers import Input, Embedding, Dense, Flatten, concatenate, multiply
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    num_layer = len(layers)
 | 
					 | 
				
			||||||
    user_input = Input(shape=(1,), dtype='int32', name='user_input')
 | 
					 | 
				
			||||||
    item_input = Input(shape=(1,), dtype='int32', name='item_input')
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    mlp_embed_user = Embedding(input_dim=num_users, output_dim=int(layers[0] / 2),
 | 
					 | 
				
			||||||
                               input_length=1)(user_input)
 | 
					 | 
				
			||||||
    mlp_embed_item = Embedding(input_dim=num_items, output_dim=int(layers[0] / 2),
 | 
					 | 
				
			||||||
                               input_length=1)(item_input)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    user_latent = Flatten()(mlp_embed_user)
 | 
					 | 
				
			||||||
    item_latent = Flatten()(mlp_embed_item)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    mlp_latent = concatenate([user_latent, item_latent], axis=1)
 | 
					 | 
				
			||||||
    for idx in range(1, num_layer):
 | 
					 | 
				
			||||||
        layer = Dense(layers[idx], activation='relu',
 | 
					 | 
				
			||||||
                      name='layer%d' % idx)
 | 
					 | 
				
			||||||
        mlp_latent = layer(mlp_latent)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if include_mf:
 | 
					 | 
				
			||||||
        mf_embed_user = Embedding(input_dim=num_users,
 | 
					 | 
				
			||||||
                                  output_dim=mf_embed,
 | 
					 | 
				
			||||||
                                  input_length=1)(user_input)
 | 
					 | 
				
			||||||
        mf_embed_item = Embedding(input_dim=num_users,
 | 
					 | 
				
			||||||
                                  output_dim=mf_embed,
 | 
					 | 
				
			||||||
                                  input_length=1)(item_input)
 | 
					 | 
				
			||||||
        mf_user_flatten = Flatten()(mf_embed_user)
 | 
					 | 
				
			||||||
        mf_item_flatten = Flatten()(mf_embed_item)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        mf_latent = multiply([mf_user_flatten, mf_item_flatten])
 | 
					 | 
				
			||||||
        concated_model = concatenate([mlp_latent, mf_latent], axis=1)
 | 
					 | 
				
			||||||
        prediction = Dense(class_num, activation='softmax', name='prediction')(concated_model)
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        prediction = Dense(class_num, activation='softmax', name='prediction')(mlp_latent)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    model = tf.keras.Model([user_input, item_input], prediction)
 | 
					 | 
				
			||||||
    return model
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
if __name__ == '__main__':
 | 
					 | 
				
			||||||
    executor_cores = 2
 | 
					 | 
				
			||||||
    lr = 0.001
 | 
					 | 
				
			||||||
    epochs = 5
 | 
					 | 
				
			||||||
    batch_size = 8000
 | 
					 | 
				
			||||||
    model_dir = "/dbfs/FileStore/model/ncf/"
 | 
					 | 
				
			||||||
    backend = "ray" # ray or spark
 | 
					 | 
				
			||||||
    data_dir = './'
 | 
					 | 
				
			||||||
    save_path = model_dir + "ncf.h5"
 | 
					 | 
				
			||||||
    
 | 
					 | 
				
			||||||
    sc = init_orca_context(cluster_mode="spark-submit")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    spark = OrcaContext.get_spark_session()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    num_users, num_items = 6000, 3000
 | 
					 | 
				
			||||||
    rdd = sc.range(0, 50000).map(
 | 
					 | 
				
			||||||
        lambda x: [random.randint(0, num_users - 1), random.randint(0, num_items - 1), random.randint(0, 4)])
 | 
					 | 
				
			||||||
    schema = StructType([StructField("user", IntegerType(), False),
 | 
					 | 
				
			||||||
                         StructField("item", IntegerType(), False),
 | 
					 | 
				
			||||||
                         StructField("label", IntegerType(), False)])
 | 
					 | 
				
			||||||
    data = spark.createDataFrame(rdd, schema)
 | 
					 | 
				
			||||||
    train, test = data.randomSplit([0.8, 0.2], seed=1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    config = {"lr": lr, "inter_op_parallelism": 4, "intra_op_parallelism": executor_cores}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def model_creator(config):
 | 
					 | 
				
			||||||
        import tensorflow as tf
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        model = build_model(num_users, num_items, 5)
 | 
					 | 
				
			||||||
        print(model.summary())
 | 
					 | 
				
			||||||
        optimizer = tf.keras.optimizers.Adam(config["lr"])
 | 
					 | 
				
			||||||
        model.compile(optimizer=optimizer,
 | 
					 | 
				
			||||||
                      loss='sparse_categorical_crossentropy',
 | 
					 | 
				
			||||||
                      metrics=['sparse_categorical_crossentropy', 'accuracy'])
 | 
					 | 
				
			||||||
        return model
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    steps_per_epoch = math.ceil(train.count() / batch_size)
 | 
					 | 
				
			||||||
    val_steps = math.ceil(test.count() / batch_size)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    estimator = Estimator.from_keras(model_creator=model_creator,
 | 
					 | 
				
			||||||
                                     verbose=False,
 | 
					 | 
				
			||||||
                                     config=config,
 | 
					 | 
				
			||||||
                                     backend=backend,
 | 
					 | 
				
			||||||
                                     model_dir=model_dir)
 | 
					 | 
				
			||||||
    estimator.fit(train,
 | 
					 | 
				
			||||||
                  batch_size=batch_size,
 | 
					 | 
				
			||||||
                  epochs=epochs,
 | 
					 | 
				
			||||||
                  feature_cols=['user', 'item'],
 | 
					 | 
				
			||||||
                  label_cols=['label'],
 | 
					 | 
				
			||||||
                  steps_per_epoch=steps_per_epoch,
 | 
					 | 
				
			||||||
                  validation_data=test,
 | 
					 | 
				
			||||||
                  validation_steps=val_steps)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    predictions = estimator.predict(test,
 | 
					 | 
				
			||||||
                                    batch_size=batch_size,
 | 
					 | 
				
			||||||
                                    feature_cols=['user', 'item'],
 | 
					 | 
				
			||||||
                                    steps=val_steps)
 | 
					 | 
				
			||||||
    print("Predictions on validation dataset:")
 | 
					 | 
				
			||||||
    predictions.show(5, truncate=False)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    print("Saving model to: ", save_path)
 | 
					 | 
				
			||||||
    estimator.save(save_path)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # load with estimator.load(save_path)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    stop_orca_context()
 | 
					 | 
				
			||||||
```
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
### **6. Other ways to install third-party libraries on Databricks if necessary**
 | 
					### **6. Other ways to install third-party libraries on Databricks if necessary**
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue