In this example, we will use tensorflow.keras package to create a keras image classification application using model MobileNetV2, and transfer the application to Cluster Serving step by step.

### Original Keras application
We will first show an original Keras application, which download the data and preprocess it, then create the MobileNetV2 model to predict.

In [1]:
import tensorflow as tf
import os
import PIL

In [2]:
tf.__version__

'2.2.0'

In [3]:
# Obtain data from url:"https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip"
zip_file = tf.keras.utils.get_file(origin="https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip",
                                   fname="cats_and_dogs_filtered.zip", extract=True)

# Find the directory of validation set
base_dir, _ = os.path.splitext(zip_file)
test_dir = os.path.join(base_dir, 'validation')
# Set images size to 160x160x3
image_size = 160

# Rescale all images by 1./255 and apply image augmentation
test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255)

# Flow images using generator to the test_generator
test_generator = test_datagen.flow_from_directory(
                test_dir,
                target_size=(image_size, image_size),
                batch_size=1,
                class_mode='binary')

Found 1000 images belonging to 2 classes.


In [4]:
# Create the base model from the pre-trained model MobileNet V2
IMG_SHAPE=(160,160,3)
model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                               include_top=False,
                                               weights='imagenet')

In keras, input could be ndarray, or generator. We could just use `model.predict(test_generator)`. But to simplify, here we just input the first record to model.

In [5]:
prediction=model.predict(test_generator.next()[0])
print(prediction)

[[[[0.         0.         0.         ... 0.         0.
    0.        ]
   [0.         0.         0.         ... 0.         0.
    0.        ]
   [0.         0.         0.         ... 0.         0.
    0.        ]
   [0.         0.         0.8406992  ... 0.         0.
    0.        ]
   [0.         0.         0.         ... 0.         0.
    0.        ]]

  [[0.         0.         0.         ... 0.         0.
    0.        ]
   [0.         0.         0.         ... 0.         0.81465054
    0.        ]
   [0.         0.         0.         ... 0.         0.6572695
    0.23970175]
   [0.         0.         0.         ... 0.         1.2423501
    0.8024192 ]
   [0.         0.         0.         ... 0.         0.
    0.        ]]

  [[0.         0.         0.         ... 0.         0.
    0.        ]
   [0.         0.         0.         ... 0.         5.185735
    0.21723604]
   [0.         0.         0.         ... 0.         4.6399093
    0.40124178]
   [0.3284886  0.         0.         .

Great! Now the Keras application is completed. 

### Export TensorFlow SavedModel
Next, we transfer the application to Cluster Serving. The first step is to save the model to SavedModel format.

In [6]:
# Save trained model to ./transfer_learning_mobilenetv2
model.save('/tmp/transfer_learning_mobilenetv2')
! ls /tmp/transfer_learning_mobilenetv2

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: /tmp/transfer_learning_mobilenetv2/assets
assets	saved_model.pb	variables


### Deploy Cluster Serving
After model prepared, we start to deploy it on Cluster Serving.

First install Cluster Serving

In [7]:
! pip install bigdl-serving

You should consider upgrading via the '/home/user/anaconda3/envs/rec/bin/python -m pip install --upgrade pip' command.[0m


In [8]:
# we go to a new directory and initialize the environment
! mkdir cluster-serving
os.chdir('cluster-serving')
! cluster-serving-init

Cluster Serving has been properly set up.
You did not specify BIGDL_VERSION, will download 0.9.0
BIGDL_VERSION is 0.9.0
BIGDL_VERSION is 0.12.1
SPARK_VERSION is 2.4.3
2.4
--2021-02-07 10:01:46--  https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-spark_2.4.3/0.9.0/bigdl-spark_2.4.3-0.9.0-serving.jar
Resolving child-prc.intel.com (child-prc.intel.com)... You are installing Cluster Serving by pip, downloading...

SIGHUP received.
Redirecting output to ‘wget-log.2’.


In [11]:
! tail wget-log.2

  2150K .......... .......... .......... .......... ..........  0% 27.0K 5h37m
  2200K .......... .......... .......... .......... ..........  0% 33.6K 5h36m
  2250K .......... .......... .......... .......... ..........  0% 27.3K 5h37m
  2300K .......... .......... .......... .......... ..........  0% 30.3K 5h36m
  2350K .......... .......... .......... .......... ..........  0% 29.7K 5h36m
  2400K .......... .......... .......... .......... ..........  0% 23.7K 5h38m
  2450K .......... .......... .......... .......... ..........  0% 23.4K 5h39m
  2500K .......... .......... .......... .......... ..........  0% 23.4K 5h41m
  2550K .......... .......... .......... .......... ..........  0% 22.3K 5h43m
  2600K .......... .......... .......... .....

In [None]:
# if you encounter slow download issue like above, you can just use following command to download
# ! wget https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-spark_2.4.3/0.9.0/bigdl-spark_2.4.3-0.9.0-serving.jar

# if you are using wget to download, or get "bigdl-xxx-serving.jar" after "ls", please call mv *serving.jar bigdl.jar after downloaded.

In [13]:
# After initialization finished, check the directory
! ls

config.yaml  bigdl.jar


We config the model path in `config.yaml` to following (the detail of config is at [Cluster Serving Configuration](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#2-configuration))

In [None]:
## BigDL Cluster Serving

model:
  # model path must be provided
  path: /tmp/transfer_learning_mobilenetv2

In [14]:
! head config.yaml

## BigDL Cluster Serving

model:
  # model path must be provided
  path: /tmp/transfer_learning_mobilenetv2
  # name, default is serving_stream, you need to specify if running multiple servings
  name:
data:
  # default, localhost:6379
  src:


### Start Cluster Serving

Cluster Serving requires Flink and Redis installed, and corresponded environment variables set, check [Cluster Serving Installation Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#1-installation) for detail.

Flink cluster should start before Cluster Serving starts, if Flink cluster is not started, call following to start a local Flink cluster.

In [16]:
! $FLINK_HOME/bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host my-PC.
Starting taskexecutor daemon on host my-PC.


After configuration, start Cluster Serving by `cluster-serving-start` (the detail is at [Cluster Serving Programming Guide](https://github.com/intel-analytics/bigdl/blob/master/docs/docs/ClusterServingGuide/ProgrammingGuide.md#3-launching-service))

In [18]:
! cluster-serving-start

model_path="/tmp/transfer_learning_mobilenetv2"
redis_timeout="5000"
Redis maxmemory is not set, using default value 8G
redis server started, please check log in redis.log
OK
OK
OK
redis config maxmemory set to 8G
OK
OK
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/user/dep/flink-1.11.2/lib/bigdl-spark_2.4.3-0.9.0-SNAPSHOT-serving.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/user/dep/flink-1.11.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/user/dep/flink-1.11.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.client.cli.CliFrontend).
log4j:WARN Please initialize the log4j system properly.
log4j:WA

### Prediction using Cluster Serving
Next we start Cluster Serving code at python client.

In [19]:
from bigdl.serving.client import InputQueue, OutputQueue
input_queue = InputQueue()

redis group exist, will not create new one
redis group exist, will not create new one


In Cluster Serving, only NdArray is supported as input. Thus, we first transform the generator to ndarray (If you do not know how to transform your input to NdArray, you may get help at [data transform guide](https://github.com/intel-analytics/bigdl/tree/master/docs/docs/ClusterServingGuide/OtherFrameworkUsers#data))

In [20]:
arr = test_generator.next()[0]
arr

array([[[[0.12156864, 0.11764707, 0.10980393],
         [0.12156864, 0.11764707, 0.10980393],
         [0.11764707, 0.1137255 , 0.10588236],
         ...,
         [0.28627452, 0.29803923, 0.22352943],
         [0.24705884, 0.25882354, 0.18431373],
         [0.24705884, 0.24705884, 0.20000002]],

        [[0.15686275, 0.15294118, 0.14509805],
         [0.13725491, 0.13333334, 0.1254902 ],
         [0.09803922, 0.09411766, 0.08627451],
         ...,
         [0.31764707, 0.3254902 , 0.27450982],
         [0.31764707, 0.3254902 , 0.27058825],
         [0.2784314 , 0.2784314 , 0.2392157 ]],

        [[0.21960786, 0.21568629, 0.20784315],
         [0.23137257, 0.227451  , 0.21960786],
         [0.24705884, 0.24313727, 0.23529413],
         ...,
         [0.29411766, 0.29803923, 0.27450982],
         [0.26666668, 0.27058825, 0.2392157 ],
         [0.30588236, 0.30588236, 0.26666668]],

        ...,

        [[0.35686275, 0.3019608 , 0.15686275],
         [0.38431376, 0.29803923, 0.14509805]

In [23]:
# Use async api to put and get, you have pass a name arg and use the name to get
input_queue.enqueue('my-input', t=arr)
output_queue = OutputQueue()
prediction = output_queue.query('my-input')
# Use sync api to predict, this will block until the result is get or timeout
prediction = input_queue.predict(arr)

Write to Redis successful
redis group exist, will not create new one
Write to Redis successful


In [24]:
prediction

array([[[0.        , 0.        , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 0.        , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 0.        , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 0.        , 0.        , ..., 1.3543907 ,
         0.        , 0.        ],
        [0.        , 0.        , 0.        , ..., 4.1898136 ,
         0.        , 0.        ]],

       [[0.        , 0.        , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 3.286649  , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 4.0817494 , 0.        , ..., 0.        ,
         0.        , 0.        ],
        [0.        , 3.3224926 , 0.        , ..., 1.4220613 ,
         0.        , 0.        ],
        [0.        , 0.        , 0.        , ..., 4.9100547 ,
         0.        , 0.        ]],

       [[0.        , 0.        , 0. 

If everything works well, the result `prediction` would be the exactly the same NdArray object with the output of original Keras model.

In [25]:
# don't forget to delete the model you save for this tutorial
! rm -rf /tmp/transfer_learning_mobilenetv2

This is the end of this tutorial. If you have any question, you could raise an issue at [BigDL Github](https://github.com/intel-analytics/bigdl/issues).