diff --git a/docs/readthedocs/source/doc/UserGuide/databricks.md b/docs/readthedocs/source/doc/UserGuide/databricks.md index bd5d9af6..6b28d24e 100644 --- a/docs/readthedocs/source/doc/UserGuide/databricks.md +++ b/docs/readthedocs/source/doc/UserGuide/databricks.md @@ -27,8 +27,8 @@ init_script = """ # install other necessary libraries, here we install libraries needed in this tutorial /databricks/python/bin/pip install tensorflow==2.9.1 -/databricks/python/bin/pip install pyarrow==8.0.0 -/databricks/python/bin/pip install psutil +/databricks/python/bin/pip install tqdm +/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html # copy bigdl jars to databricks cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars @@ -54,8 +54,8 @@ Create a file **init.sh**(or any other filename) in your computer, the file cont # install other necessary libraries, here we install libraries needed in this tutorial /databricks/python/bin/pip install tensorflow==2.9.1 -/databricks/python/bin/pip install pyarrow==8.0.0 -/databricks/python/bin/pip install psutil +/databricks/python/bin/pip install tqdm +/databricks/python/bin/pip install torch==1.11.0+cpu torchvision==0.12.0+cpu tensorboard -f https://download.pytorch.org/whl/torch_stable.html # copy bigdl jars to databricks cp /databricks/python/lib/python3.8/site-packages/bigdl/share/*/lib/*.jar /databricks/jars @@ -103,132 +103,12 @@ Output on Databricks: ![](images/init-orca-context.png) -**Run ncf_train example on Databricks** +**Run Examples** -Create a notebook and run the following example. Note that to make things simple, we are just generating some dummy data for this example. +- [Keras example on Databricks](https://github.com/intel-analytics/BigDL/blob/main/python/orca/tutorial/databricks/tf_keras_ncf.ipynb) +- [Pytorch example on Databricks](https://github.com/intel-analytics/BigDL/blob/main/python/orca/tutorial/databricks/pytorch_fashion_mnist.ipynb) -```python -import math -import argparse -import os -import random - -from bigdl.orca import init_orca_context, stop_orca_context, OrcaContext -from bigdl.orca.learn.tf2.estimator import Estimator -from pyspark.sql.types import StructType, StructField, IntegerType - - -def build_model(num_users, num_items, class_num, layers=[20, 10], include_mf=True, mf_embed=20): - import tensorflow as tf - from tensorflow.keras.layers import Input, Embedding, Dense, Flatten, concatenate, multiply - - num_layer = len(layers) - user_input = Input(shape=(1,), dtype='int32', name='user_input') - item_input = Input(shape=(1,), dtype='int32', name='item_input') - - mlp_embed_user = Embedding(input_dim=num_users, output_dim=int(layers[0] / 2), - input_length=1)(user_input) - mlp_embed_item = Embedding(input_dim=num_items, output_dim=int(layers[0] / 2), - input_length=1)(item_input) - - user_latent = Flatten()(mlp_embed_user) - item_latent = Flatten()(mlp_embed_item) - - mlp_latent = concatenate([user_latent, item_latent], axis=1) - for idx in range(1, num_layer): - layer = Dense(layers[idx], activation='relu', - name='layer%d' % idx) - mlp_latent = layer(mlp_latent) - - if include_mf: - mf_embed_user = Embedding(input_dim=num_users, - output_dim=mf_embed, - input_length=1)(user_input) - mf_embed_item = Embedding(input_dim=num_users, - output_dim=mf_embed, - input_length=1)(item_input) - mf_user_flatten = Flatten()(mf_embed_user) - mf_item_flatten = Flatten()(mf_embed_item) - - mf_latent = multiply([mf_user_flatten, mf_item_flatten]) - concated_model = concatenate([mlp_latent, mf_latent], axis=1) - prediction = Dense(class_num, activation='softmax', name='prediction')(concated_model) - else: - prediction = Dense(class_num, activation='softmax', name='prediction')(mlp_latent) - - model = tf.keras.Model([user_input, item_input], prediction) - return model - - -if __name__ == '__main__': - executor_cores = 2 - lr = 0.001 - epochs = 5 - batch_size = 8000 - model_dir = "/dbfs/FileStore/model/ncf/" - backend = "ray" # ray or spark - data_dir = './' - save_path = model_dir + "ncf.h5" - - sc = init_orca_context(cluster_mode="spark-submit") - - spark = OrcaContext.get_spark_session() - - num_users, num_items = 6000, 3000 - rdd = sc.range(0, 50000).map( - lambda x: [random.randint(0, num_users - 1), random.randint(0, num_items - 1), random.randint(0, 4)]) - schema = StructType([StructField("user", IntegerType(), False), - StructField("item", IntegerType(), False), - StructField("label", IntegerType(), False)]) - data = spark.createDataFrame(rdd, schema) - train, test = data.randomSplit([0.8, 0.2], seed=1) - - config = {"lr": lr, "inter_op_parallelism": 4, "intra_op_parallelism": executor_cores} - - - def model_creator(config): - import tensorflow as tf - - model = build_model(num_users, num_items, 5) - print(model.summary()) - optimizer = tf.keras.optimizers.Adam(config["lr"]) - model.compile(optimizer=optimizer, - loss='sparse_categorical_crossentropy', - metrics=['sparse_categorical_crossentropy', 'accuracy']) - return model - - - steps_per_epoch = math.ceil(train.count() / batch_size) - val_steps = math.ceil(test.count() / batch_size) - - estimator = Estimator.from_keras(model_creator=model_creator, - verbose=False, - config=config, - backend=backend, - model_dir=model_dir) - estimator.fit(train, - batch_size=batch_size, - epochs=epochs, - feature_cols=['user', 'item'], - label_cols=['label'], - steps_per_epoch=steps_per_epoch, - validation_data=test, - validation_steps=val_steps) - - predictions = estimator.predict(test, - batch_size=batch_size, - feature_cols=['user', 'item'], - steps=val_steps) - print("Predictions on validation dataset:") - predictions.show(5, truncate=False) - - print("Saving model to: ", save_path) - estimator.save(save_path) - - # load with estimator.load(save_path) - - stop_orca_context() -``` +> Note that if you want to save model to DBFS, or load model from DBFS, the save/load path should be the **File API Format** on Databricks, which means your save/load path should start with `/dbfs`. ### **6. Other ways to install third-party libraries on Databricks if necessary**