How to use mlflow on AWS ec2 (without databricks) to deploy pyspark models on AWS Sagemaker

  • Like
  • Report this story

Lately i have been trying to deploy my pyspark ml models in production using mlflow, mleap and sagemaker. Doing this is quite easy on databricks as it manages most of packages, but as i was developing model on AWS ec2 instance using pyspark there were couple of challenges - listing the steps & installation out here hoping someone may find it useful.

We will be using AWS Linux EC2 instance to create our model. I will assume you already have a AWS account & an linux ec2 instance launched and ready to start working with. Also you have configured IAM roles for ec2 to be able to access S3 (for saving artifacts) & Sagemaker (for deploying model).

Next, Below is the requirement list we would need to install & configure inorder to deploy models to sagemaker using MLFlow:

  1. Docker
  2. Python & Jupyter notebook setup
  3. PySpark
  4. MLFlow
  5. MLeap (Optional - need only for pyspark models)
 

Docker Setup

Update the packages on your instance
    > sudo yum update -y
Install Docker
    > sudo yum install docker -yStart the Docker Service
> sudo service docker start
Add the ec2-user to the docker group so you can execute Docker commands without using sudo.
    > sudo usermod -a -G docker ec2-user
 

Install Python, pip & create virtualenv

Install Python (In my case, i installed python 3.7 please check your version)
    > sudo yum install python -y
Install Pip
    > sudo yum install pip -y
Start the Docker Service
    > pip install virtualenv
Install Jupyter Notebook 
    > pip install ipython
> pip install ipykernel
> ipython kernel install --user --name=venv
> virtualenv venv
> source venv/bin/activate
> pip install notebook

Important: Before we proceed to next steps please install Java and configure environment variables as we will need to run spark.

Install Pyspark

Download Spark 2.4.5
    > wget http://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
Extract tgz file
    > tar -xzvf spark-2.4.5-bin-hadoop2.7.tgz
Setup environment variable
    > export SPARK_HOME=/home/ec2-user/venv_pradeep/spark-2.4.5-bin-hadoop2.7
> export PATH=$JAVA_HOME:$SPARK_HOME:$PATH
Alternatively, You can also install pyspark package to use in python
    > pip install pyspark

Install & Configure MLFlow Server

Install MLflow
    > pip install mlflow
Install Date Utils for avoiding versioning conflict
    > pip install -U python-dateutil==2.6.1
check installaton
    > mlflow --help
Install nginx server
    > sudo yum install nginx
> sudo service nginx start
Install httpd tools
    > sudo yum install httpd-tools
Set Password
    > sudo htpasswd -c /etc/nginx/.htpasswd testuser
Modify nginx config file
    > sudo nano /etc/nginx/nginx.conf
Add reverse proxy to port 5000
location / {
    proxy_pass http://localhost:5000/;
    auth_basic “Restricted Content”;
    auth_basic_user_file /etc/nginx/.htpasswd;
}
Reload nginx
    > sudo service nginx reload
Start MLflow server
Create a S3 bucket to store models and artifacts:
Start MLflow tracking server. It will use port 5000 by default:
    > mkdir mlflow
    > mkdir mlflow/experiments
    > mkdir mlflow/logs
  > nohup mlflow server --backend-store-uri /mlflow/experiments/ --default-artifact-root s3://<your_s3_bucket_name>/ --host 0.0.0.0 > mlflow/logs/mlflow_tserver.log 2>&1 &
Now you can access your server using its public DNS:
    > http://XXXXXXXXXXXXXXXXXXXXXXXXXXX.compute.amazonaws.com/

Install MLeap

For storing our model on s3 & deploying it to sagemaker we will use Mleap. So lets install mleap too and download all required Jars.
Install MLeap
    > pip install mleap==0.15.0

List of Jars to download & save in $SPARK_HOME/Jars folder

  1. hadoop-aws.2.7.3.jar
  2. aws-java-sdk.1.7.4.jar
  3. JetS3t.jar (0.9.4)
  4. mleap-base_2.11-0.15.0.jar
  5. mleap-core_2.11-0.15.0.jar
  6. mleap-runtime_2.11-0.15.0.jar
  7. mleap-spark_2.11-0.15.0.jar
  8. mleap-spark-base_2.11-0.15.0.jar
  9. mleap-tensor_2.11-0.15.0.jar
  10. bundle-hdfs_2.11-0.15.0.jar
  11. bundle-ml_2.11-0.15.0.jar
  12. scalapb-runtime_2.11-0.9.0-RC1.jar
  13. lenses_2.11-0.9.0-RC1.jar
  14. config-1.3.4.jar
  15. spray-json_2.11-1.3.5.jar
  16. scala-arm_2.11-2.0.jar
  17. protobuf-java-3.8.0.jar
Now we have installed all our tools that we need, So lets start our notebook.
    > jupyter notebook

Important - If you noticed we havn't created any public access for our jupyter notebook, you can create tunnel to access or alternatively configure public access. To create tunnel just run following command.

> ssh -N -L <local_port>:127.0.0.1:<remote_port> -i <pem_file_location> [email protected]<ec2_public_ip>:22
by default jupyter runs on port 8888 can configure as same. Then you can access notebook at http://localhost:8888

Well thats it with installations.

Lets create a simple model:

I am using wine quality dataset for this article, which is stored in s3.
from pyspark.sql import SparkSession
from
pyspark import SparkConf
import boto3
import pandas as pd
import io
app_name = "Pyspark Model"
spark = SparkSession.builder.master('local[*]').appName(app_name).enableHiveSupport().getOrCreate()
# Lets read the data from s3 and create spark dataframe
s3 = boto3.client('s3')
obj = s3.get_object(Bucket='sagemaker-mlflow-pyspark-test', Key='data/wine_quality.csv')
inputDF = spark.createDataFrame(pd.read_csv(io.BytesIO(obj['Body'].read())))
# Lets list out features to be used by our model
featureColumns = [c for c in inputDF.columns if c != 'quality']
# Lets import all necessary packages
import sys
import logging
import mlflow
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
import mlflow.mleap as mleap
# Lets set out mlflow tracking server
TRACKING_URI = 'http://<user_name>:<password>@<ec2_public_address>/'
EXPERIENT_NAME = "pyspark_experiments"
client = mlflow.tracking.MlflowClient(TRACKING_URI)
mlflow.set_tracking_uri(TRACKING_URI)
mlflow.set_experiment(EXPERIENT_NAME)
print("MLflow Version:", mlflow.version.VERSION)
print("Tracking URI:", mlflow.tracking.get_tracking_uri())
print("experiment_name:", EXPERIENT_NAME)
# Lets create simple RandomForest model
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
(trainingDF, testDF) = inputDF.randomSplit([0.7, 0.3])
def train_model(location=None):
    assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
    rf = RandomForestRegressor(featuresCol="features", labelCol="quality", numTrees=30, maxDepth=10, maxBins=128, minInstancesPerNode=5, seed=123)
    pipeline = Pipeline(stages=[assembler, rf])
    model = pipeline.fit(trainingDF)
    predictions = model.transform(testDF)
    evaluator = RegressionEvaluator(labelCol="quality", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    mlflow.log_param("featuresCol", "features")
    mlflow.log_param("labelCol", "quality")
    mlflow.log_param("numTrees", 30)
    mlflow.log_param("maxDepth", 5)
    mlflow.log_param("maxBins", 128)
    mlflow.log_param("minInstancesPerNode", 5)
    mlflow.log_param("seed", 123)
    mlflow.log_metric("rmse", rmse)
    print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
    searilized_model = model.serializeToBundle(location, model.transform(testDF))
    mlflow.mleap.log_model(spark_model=model, sample_input=testDF, artifact_path="spark_rf_pipline_model")
    mlflow.end_run()

Lets run the model

with mlflow.start_run() as run:
    print("client",client)

    print("run_id:",run.info.run_uuid)
    print("experiment_id:",run.info.experiment_id)
    train_model('jar:file:/tmp/mymodel.zip')

Deploy model to Sagemaker as REST endpoint

You will need to create ecr image in aws to deploy model as docker image. The ECR URL will look something like this : {account_id}.dkr.ecr.{region}.amazonaws.com/{repo_name}:{tag}

Lets deploy our model

IMAGE_ECR_URL = {account_id}.dkr.ecr.{region}.amazonaws.com/{repo_name}:{tag}
APP_NAME = "ec2PySparkModel"
"""you can get this from mlflow ui or using get_artifact_uri() method"""
model_uri= "<saved_atifact_location>"
REGION="us-east-1"
# deploy the model
mfs.deploy(
    app_name=APP_NAME,
    model_uri=model_uri,
    image_url=IMAGE_ECR_URL,
    region_name=REGION,
    mode="create",
    flavor="mleap"
)
Model will be available for predictions once status changes to "InService". You can easily check status by:
def check_status(app_name):
    sage_client = boto3.client('sagemaker', region_name=REGION)
    endpoint_description = sage_client.describe_endpoint(EndpointName=app_name)
    endpoint_status = endpoint_description["EndpointStatus"]
    return endpoint_status
print("Application status is: {}".format(check_status(APP_NAME)))

Thats it - Lets start predicting.

import json
input_json = testDF.limit(2).toPandas().to_json(orient="split")
# print("Using input dataframe JSON: {}".format(input_json))
def query_endpoint(app_name, input_json):
    client = boto3.session.Session().client("sagemaker-runtime", REGION)
    response = client.invoke_endpoint(
        EndpointName=app_name,
        Body=input_json,
        ContentType='application/json',
    )
    preds = response['Body'].read().decode("ascii")
    preds = json.loads(preds)
    print("Received response: {}".format(preds))
    return preds
print("Sending batch prediction request with input dataframe json: {}".format(input_json))
# Evaluate the input by posting it to the deployed model
prediction1 = query_endpoint(app_name=APP_NAME, input_json=input_json)
prediction1
You should be able to get predictions.
eg - [5.905582010582011, 5.270440875244407]

Once endpoint is nolonger needed we can delete end point to avoid any charges.

mfs.delete(app_name=APP_NAME, region_name=region, archive=False)
def get_active_endpoints(app_name):
    sage_client = boto3.client('sagemaker', region_name=region)
    app_endpoints = sage_client.list_endpoints(NameContains=app_name)["Endpoints"]
    return list(filter(lambda en : en == app_name, [str(endpoint["EndpointName"]) for endpoint in app_endpoints]))
print("The following endpoints exist for the `{an}` application: {eps}".format(an=APP_NAME, eps=get_active_endpoints(APP_NAME)))

~ WELL THATS IT !! ~

MLFlow Databricks AWS Sagemaker PySpark ec2

Related Stories


  • Likes  0
  • Comments  0
  • Be the first to comment.
Advertisement