571 lines
18 KiB
Text
571 lines
18 KiB
Text
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "reported-geometry",
|
|
"metadata": {},
|
|
"source": [
|
|
"In this example, we will use tensorflow v1 (version 1.15) to create a simple MLP model, and transfer the application to Cluster Serving step by step.\n",
|
|
"\n",
|
|
"This tutorial is recommended for Tensorflow v1 user only. If you are not Tensorflow v1 user, the keras tutorial [here](#keras-to-cluster-serving-example.ipynb) is more recommended."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "athletic-trance",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Original Tensorflow v1 Application"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "olive-dutch",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"'1.15.0'"
|
|
]
|
|
},
|
|
"execution_count": 1,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"import tensorflow as tf\n",
|
|
"tf.__version__"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "vertical-recall",
|
|
"metadata": {},
|
|
"source": [
|
|
"We first define the Tensorflow graph, and create some data."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "tropical-clinton",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"WARNING:tensorflow:From <ipython-input-2-853d23643c61>:24: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.\n",
|
|
"Instructions for updating:\n",
|
|
"Use tf.where in 2.0, which has the same broadcast rule as np.where\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"g = tf.Graph()\n",
|
|
"with g.as_default():\n",
|
|
" \n",
|
|
" # Graph Inputs\n",
|
|
" features = tf.placeholder(dtype=tf.float32, \n",
|
|
" shape=[None, 2], name='features')\n",
|
|
" targets = tf.placeholder(dtype=tf.float32, \n",
|
|
" shape=[None, 1], name='targets')\n",
|
|
"\n",
|
|
" # Model Parameters\n",
|
|
" weights = tf.Variable(tf.zeros(shape=[2, 1], \n",
|
|
" dtype=tf.float32), name='weights')\n",
|
|
" bias = tf.Variable([[0.]], dtype=tf.float32, name='bias')\n",
|
|
" \n",
|
|
"\n",
|
|
" \n",
|
|
" # Forward Pass\n",
|
|
" linear = tf.add(tf.matmul(features, weights), bias, name='linear')\n",
|
|
" ones = tf.ones(shape=tf.shape(linear)) \n",
|
|
" zeros = tf.zeros(shape=tf.shape(linear))\n",
|
|
" prediction = tf.where(condition=tf.less(linear, 0.),\n",
|
|
" x=zeros, \n",
|
|
" y=ones, \n",
|
|
" name='prediction')\n",
|
|
" \n",
|
|
" # Backward Pass\n",
|
|
" errors = targets - prediction\n",
|
|
" weight_update = tf.assign_add(weights, \n",
|
|
" tf.reshape(errors * features, (2, 1)),\n",
|
|
" name='weight_update')\n",
|
|
" bias_update = tf.assign_add(bias, errors,\n",
|
|
" name='bias_update')\n",
|
|
" \n",
|
|
" train = tf.group(weight_update, bias_update, name='train')\n",
|
|
" \n",
|
|
" saver = tf.train.Saver(name='saver')\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "legislative-boutique",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"((3, 2), (3,))"
|
|
]
|
|
},
|
|
"execution_count": 4,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"import numpy as np\n",
|
|
"x_train, y_train = np.array([[1,2],[3,4],[1,3]]), np.array([1,2,1])\n",
|
|
"x_train.shape, y_train.shape"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "coated-grill",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Export TensorFlow SavedModel\n",
|
|
"Then, we train the graph and in the `with tf.Session`, we save the graph to SavedModel. The detailed code is following, and we could see the prediction result is `[1]` with input `[1,2]`."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "detailed-message",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Model parameters:\n",
|
|
"\n",
|
|
"Weights:\n",
|
|
" [[15.]\n",
|
|
" [20.]]\n",
|
|
"Bias: [[5.]]\n",
|
|
"[[1.]\n",
|
|
" [1.]\n",
|
|
" [1.]]\n",
|
|
"WARNING:tensorflow:From <ipython-input-5-399ffbde562b>:26: simple_save (from tensorflow.python.saved_model.simple_save) is deprecated and will be removed in a future version.\n",
|
|
"Instructions for updating:\n",
|
|
"This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.simple_save.\n",
|
|
"WARNING:tensorflow:From /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages/tensorflow_core/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.\n",
|
|
"Instructions for updating:\n",
|
|
"This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.\n",
|
|
"INFO:tensorflow:Assets added to graph.\n",
|
|
"INFO:tensorflow:No assets to write.\n",
|
|
"INFO:tensorflow:SavedModel written to: /tmp/mlp_tf1/saved_model.pb\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"with tf.Session(graph=g) as sess:\n",
|
|
" \n",
|
|
" sess.run(tf.global_variables_initializer())\n",
|
|
" \n",
|
|
" for epoch in range(5):\n",
|
|
" for example, target in zip(x_train, y_train):\n",
|
|
" feed_dict = {'features:0': example.reshape(-1, 2),\n",
|
|
" 'targets:0': target.reshape(-1, 1)}\n",
|
|
" _ = sess.run(['train'], feed_dict=feed_dict)\n",
|
|
"\n",
|
|
"\n",
|
|
" w, b = sess.run(['weights:0', 'bias:0']) \n",
|
|
" print('Model parameters:\\n')\n",
|
|
" print('Weights:\\n', w)\n",
|
|
" print('Bias:', b)\n",
|
|
"\n",
|
|
" saver.save(sess, save_path='perceptron')\n",
|
|
" \n",
|
|
" pred = sess.run('prediction:0', feed_dict={features: x_train})\n",
|
|
" print(pred)\n",
|
|
" \n",
|
|
" # in this session, save the model to savedModel format\n",
|
|
" inputs = dict([(features.name, features)])\n",
|
|
" outputs = dict([(prediction.name, prediction)])\n",
|
|
" inputs, outputs\n",
|
|
" tf.saved_model.simple_save(sess, \"/tmp/mlp_tf1\", inputs, outputs)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "consolidated-newport",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Deploy Cluster Serving\n",
|
|
"After model prepared, we start to deploy it on Cluster Serving.\n",
|
|
"\n",
|
|
"First install Cluster Serving"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "inner-texas",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Collecting bigdl-serving\n",
|
|
"Requirement already satisfied: httpx in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (0.17.1)\n",
|
|
"Requirement already satisfied: pyarrow in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (3.0.0)\n",
|
|
"Requirement already satisfied: pyyaml in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (5.4.1)\n",
|
|
"Requirement already satisfied: redis in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (3.5.3)\n",
|
|
"Requirement already satisfied: opencv-python in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from bigdl-serving) (4.5.1.48)\n",
|
|
"Requirement already satisfied: certifi in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (2020.12.5)\n",
|
|
"Requirement already satisfied: httpcore<0.13,>=0.12.1 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (0.12.3)\n",
|
|
"Requirement already satisfied: rfc3986[idna2008]<2,>=1.3 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (1.4.0)\n",
|
|
"Requirement already satisfied: sniffio in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpx->bigdl-serving) (1.2.0)\n",
|
|
"Requirement already satisfied: h11==0.* in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from httpcore<0.13,>=0.12.1->httpx->bigdl-serving) (0.12.0)\n",
|
|
"Requirement already satisfied: idna in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from rfc3986[idna2008]<2,>=1.3->httpx->bigdl-serving) (3.1)\n",
|
|
"Requirement already satisfied: numpy>=1.14.5 in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages (from opencv-python->bigdl-serving) (1.20.1)\n",
|
|
"Installing collected packages: bigdl-serving\n",
|
|
"Successfully installed bigdl-serving-0.9.0\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! pip install bigdl-serving"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "working-terrorism",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Trying to find config file in /home/user/anaconda3/envs/tf1/lib/python3.7/site-packages/bigdl/conf/config.yaml\r\n",
|
|
"Config file found in pip package, copying...\r\n",
|
|
"Config file ready.\r\n",
|
|
"Cluster Serving has been properly set up.\r\n",
|
|
"You did not specify ANALYTICS_ZOO_VERSION, will download 0.9.0\r\n",
|
|
"ANALYTICS_ZOO_VERSION is 0.9.0\r\n",
|
|
"BIGDL_VERSION is 0.12.1\r\n",
|
|
"SPARK_VERSION is 2.4.3\r\n",
|
|
"2.4\r\n",
|
|
"You are installing Cluster Serving by pip, downloading...\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"import os\n",
|
|
"! mkdir cluster-serving\n",
|
|
"os.chdir('cluster-serving')\n",
|
|
"! cluster-serving-init"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "excited-exception",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" 2800K .......... .......... .......... .......... .......... 0% 11.8M 19m20s\r\n",
|
|
" 2850K .......... .......... .......... .......... .......... 0% 11.3M 19m1s\r\n",
|
|
" 2900K .......... .......... .......... .......... .......... 0% 8.60M 18m43s\r\n",
|
|
" 2950K .......... .......... .......... .......... .......... 0% 11.9M 18m25s\r\n",
|
|
" 3000K .......... .......... .......... .......... .......... 0% 11.8M 18m7s\r\n",
|
|
" 3050K .......... .......... .......... .......... .......... 0% 674K 18m4s\r\n",
|
|
" 3100K .......... .......... .......... .......... .......... 0% 418K 18m9s\r\n",
|
|
" 3150K .......... .......... .......... .......... .......... 0% 1.05M 18m0s\r\n",
|
|
" 3200K .......... .......... .......... .......... .......... 0% 750K 17m56s\r\n",
|
|
" 3250K .......... .......... .......... ...."
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! tail wget-log"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "casual-premium",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# if you encounter slow download issue like above, you can just use following command to download\n",
|
|
"# ! wget https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-bigdl_0.12.1-spark_2.4.3/0.9.0/bigdl-bigdl_0.12.1-spark_2.4.3-0.9.0-serving.jar\n",
|
|
"\n",
|
|
"# if you are using wget to download, or get \"bigdl-xxx-serving.jar\" after \"ls\", please call mv *serving.jar bigdl.jar after downloaded."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"id": "ruled-bermuda",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"bigdl-bigdl_0.12.1-spark_2.4.3-0.9.0-serving.jar config.yaml wget-log\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"# After initialization finished, check the directory\n",
|
|
"! ls"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"id": "computational-rehabilitation",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Call mv *serving.jar bigdl.jar as mentioned above\n",
|
|
"! mv *serving.jar bigdl.jar"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"id": "personal-central",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"config.yaml wget-log bigdl.jar\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! ls"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "combined-stability",
|
|
"metadata": {},
|
|
"source": [
|
|
"We config the model path in `config.yaml` to following (the detail of config is at [Cluster Serving Configuration](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#2-configuration))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "received-hayes",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"## BigDL Cluster Serving\n",
|
|
"\n",
|
|
"model:\n",
|
|
" # model path must be provided\n",
|
|
" path: /tmp/mlp_tf1"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"id": "satellite-honey",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"## BigDL Cluster Serving\r\n",
|
|
"\r\n",
|
|
"model:\r\n",
|
|
" # model path must be provided\r\n",
|
|
" path: /tmp/mlp_tf1\r\n",
|
|
" # name, default is serving_stream, you need to specify if running multiple servings\r\n",
|
|
" name:\r\n",
|
|
"data:\r\n",
|
|
" # default, localhost:6379\r\n",
|
|
" src:\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! head config.yaml"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "planned-hometown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Start Cluster Serving\n",
|
|
"\n",
|
|
"Cluster Serving requires Flink and Redis installed, and corresponded environment variables set, check [Cluster Serving Installation Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#1-installation) for detail.\n",
|
|
"\n",
|
|
"Flink cluster should start before Cluster Serving starts, if Flink cluster is not started, call following to start a local Flink cluster."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"id": "antique-melbourne",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Starting cluster.\n",
|
|
"Starting standalonesession daemon on host user-PC.\n",
|
|
"Starting taskexecutor daemon on host user-PC.\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! $FLINK_HOME/bin/start-cluster.sh"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "interested-bench",
|
|
"metadata": {},
|
|
"source": [
|
|
"After configuration, start Cluster Serving by `cluster-serving-start` (the detail is at [Cluster Serving Programming Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#3-launching-service))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"id": "modern-monster",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"model_path=\"/tmp/mlp_tf1\"\n",
|
|
"redis_timeout=\"5000\"\n",
|
|
"Redis maxmemory is not set, using default value 8G\n",
|
|
"redis server started, please check log in redis.log\n",
|
|
"OK\n",
|
|
"OK\n",
|
|
"OK\n",
|
|
"redis config maxmemory set to 8G\n",
|
|
"OK\n",
|
|
"OK\n",
|
|
"Starting new Cluster Serving job.\n",
|
|
"Cluster Serving job submitted, check log in log-cluster_serving-serving_stream.txt\n",
|
|
"To list Cluster Serving job status, use cluster-serving-cli list\n",
|
|
"{maxmem=null, timeout=5000}timeout getted: 5000\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"! cluster-serving-start"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "improved-rough",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Prediction using Cluster Serving\n",
|
|
"Next we start Cluster Serving code at python client."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 22,
|
|
"id": "immune-madness",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"redis group exist, will not create new one\n",
|
|
"redis group exist, will not create new one\n",
|
|
"Write to Redis successful\n",
|
|
"redis group exist, will not create new one\n",
|
|
"Write to Redis successful\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from bigdl.serving.client import InputQueue, OutputQueue\n",
|
|
"input_queue = InputQueue()\n",
|
|
"# Use async api to put and get, you have pass a name arg and use the name to get\n",
|
|
"arr = np.array([1,2])\n",
|
|
"input_queue.enqueue('my-input', t=arr)\n",
|
|
"output_queue = OutputQueue()\n",
|
|
"prediction = output_queue.query('my-input')\n",
|
|
"# Use sync api to predict, this will block until the result is get or timeout\n",
|
|
"prediction = input_queue.predict(arr)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 23,
|
|
"id": "signal-attention",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"array([1.], dtype=float32)"
|
|
]
|
|
},
|
|
"execution_count": 23,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"prediction"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "suitable-selection",
|
|
"metadata": {},
|
|
"source": [
|
|
"The `prediction` result would be the same as using Tensorflow.\n",
|
|
"\n",
|
|
"This is the end of this tutorial. If you have any question, you could raise an issue at [BigDL Github](https://github.com/intel-analytics/bigdl/issues)."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.7.10"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|