Recently, I’ve been working to deploy a new machine learning model into a production environment. This is the first time I’ve had to deploy a model that runs across such huge datasets. The requirement is to make 30,000,000 predictions each time the model runs.
In terms of the pipeline, it’s three distinct phases. The first, is a feature generation which provides the raw data, cleaned up & structured. The next phase, is to aggregate those nice, clean features. The final stage is to take the aggregated features & score them.
These are decoupled – rather than making them a single monolithic script, it’s far easier to troubleshoot and performance tune when they’re separate and it also leads to fault tolerance – e.g. if phase 1 fails, phase 2 and 3 can continue to run.
In this particular article, I’m going to cover the scoring script. This script pulls data in using the scalability of Apache Spark. It then converts the dataframe to Pandas; runs the dataframe through the models and then writes the data to HDFS.
To grab the data in Spark, it’s super simple; we use Spark SQL to select the data we need from our database table & save that to a dataframe called ‘features_raw’.
import pandas as pd import numpy as np import pickle from pyspark.sql import SparkSession import pyspark.sql.functions as sqlfunc from pyspark.sql import * from pyspark.sql.functions import * from pyspark.sql.types import * import os spark = SparkSession\ .builder\ .appName('scoring')\ .master('yarn')\ .enableHiveSupport()\ .getOrCreate() get_data = '''select * from database.tablename''' features_raw = spark.sql(get_data)
Next, we convert our Spark dataframe to Pandas. This may not be a trivial task, as it can lead to memory exceptions. To get around this; I loop through the original dataframe, scoring 1 million rows at a time, rather than attempting to do all the rows at once.
features = features_raw.toPandas()
Next, we open our pickled models; make predictions using those models & write the result back to the features dataframe.
with open('/path/to/pickle/picklename.pickle', 'rb') as file: model = pickle.load(file) predictions = model.predict(features[['field1', 'field2', 'field3']]) features['prediction'] = predictions
Next, we need to output the predictions to CSV.
features.to_csv('/path/to/csv/output_'+str(i)+'.csv', mode='w+', header=None, encoding="utf-8")
Now, having these in CSV’s is not very helpful. We’re going to save these in a Hive table. It’s going to run every day, so that table needs to be partitioned. So first, I’ll create the table I need.
CREATE EXTERNAL TABLE database.table_name ( `id` int, `subject` string, `field1` string, `field2` string, `field3` string, `prediction` string) PARTITIONED BY (`date` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
Now, I need to automate the creation of a partition. So, each time the model loops, it’ll say ‘is the partiton I need available?’. If the answer is yes, it’ll proceed to put data into that partition. If the answer is no, it’ll create it.
now = datetime.now() yest = int(now.strftime('%s')) - 24*60*60*1 dt_to_use = datetime.fromtimestamp(yest).strftime('%Y%m%d') cmd = 'hive -e ' + '"ALTER TABLE database.table ADD IF NOT EXISTS PARTITION(date=xxxxxx)"' cmd = cmd.replace("xxxxxx", dt_to_use) output = os.system(cmd)
Finally, we move the csv we generated to HDFS, in our Hive table directory. When you go to query the table, you’ll find that your partition is now populated with data.
cmd2 = 'hdfs dfs -put -f /local/path/to/csv/output_'+str(i)+'.csv /hdfs/path/to/hive/table/date='+dt_to_use+'/output_'+str(i)+'.csv' output = os.system(cmd2)
Hopefully this overview was useful!