Apache Spark provides us with a framework to crunch a huge amount of data efficiently by leveraging parallelism which is great! Tweaking it can be a bit of an art which is not so great!
In this article, I will talk about some of the ways that I have successfully tuned the performance of my scripts in the past.
Do you really need that?
The first approach is to remove the fields you don’t need. It’s really easy to take the approach below, whereby we just select everything from our datasource.
logs = spark_session.sql(“Select * from db.table where dt = ” + yday_date )
This though presents us with a bit of an issue. If you have 1,000 rows and 100 columns but only need 10 of those columns; you’re spending time loading unwanted data into memory and lugging a heavy dataframe around with you. So, the simplest way we can speed up our Spark scripts is to simply drop out the data we don’t need. My dataframe definition then may be altered to look like this.
logs = spark_session.sql(“Select date, customerid, orderid, price from db.table where dt = ” + yday_date )
Along the same lines; if you don’t need all the raw data & can work with aggregated data; you should pre-aggregate it. In the below example, I’ve taken the total value of orders per customer on a single day.
logs = spark_session.sql(“Select date, customerid, sum(price) as value from db.table where dt = ” + yday_date group by date, customerid )
By doing this aggregation, we will be bringing significantly less data into our Spark dataframe & hence can achieve greater performance.
To cache or not to cache?
That really is the question!
Unfortunately, there isn’t an entirely straightforward answer and I would recommend A/B testing your scripts with and without caching but here are my thoughts… If you can answer yes to either of the questions below, then it’s worth investigating.
First off, should we bother caching the data? We need to consider whether your datasource inherently slow or whether the data is really big. That is, would reading the same data multiple times bring a large data ingestion overhead? Does it take minutes or even hours to read in the data to Spark, before processing begins?
Now let’s consider whether you ingest the same data multiple times within your script? and when you ingest that data, do you carry out expensive computations on it? That is; do you aggregate; carry out string functions; run UDFs or create new calculated fields?
These are the two primary reasons I see for trialing caching – your data is really big and/or your datasource is really slow or you use the same data multiple times within your script.
Alright, but what type of cache?
Now that we’ve decided that we should look into caching, we need to decide what kind of caching we’re going to use. Let’s go back to our example above and tweak it a bit.
You can see that I’ve simply written .cache() at the end of our dataframe definition. This tells Spark to cache the dataframe in memory. This may not always be appropriate, it really depends how much memory you have and how much data you’re trying to fit into it.
logs = spark_session.sql(“Select date, customerid, sum(price) as value from db.table where dt = ” + yday_date group by date, customerid ).cache()
x = logs.count()
You can see I also added a count function. This is because Spark is lazy and it won’t actually cache when you simply call .cache() – you need to carry out a function on that dataset to force the cache to happen.
As I mentioned above, caching to memory may not always be appropriate, so we also have a .persist() option, which is much more flexible. By default, it’s a split between storage and memory.
logs = spark_session.sql(“Select date, customerid, sum(price) as value from db.table where dt = ” + yday_date group by date, customerid ).persist()
x = logs.count()
But; we can be much more specific. We can use any of the below options for persisting, by using persist(StorageLevel.<one-of-the-below>). It may take some trial and error to find out, but you may find one is better suited to your environment than the others.
OK, I’ve done it, but it’s taking ages!
Something which I ran into was this. The cache (no matter to which storage level) was just slow. It took 1 hour to cache my data and once cached, the operations on that cache were just slow. The script with caching enabled took 2.5 hours; compared to 1.5 hours with caching disabled. So as I mentioned above, it may not actually be necessary – it turned out in my case that, because I was selecting the raw log files (every row) directly from Hive, the response was rapid (even faster than caching the dataset in memory). So, it’ll be specific to your environment and what you’re trying to achieve.
UDF’s are the devil
What can I say…. avoid UDFs unless you absolutely have no choice. That’s because Spark itself is pretty awesome at optimizing our jobs when we use Spark functions. But, when we use a Python function and crowbar it into our Spark scripts, it acts as a black box to Spark – so, Spark makes no attempt to optimize the script.
That isn’t to say you should never use them; just be careful about when you use them and if you can use a case statement or a Spark function, that will be far more efficient.
Is that it?
You’d like that, wouldn’t you…
Unfortunately, it’s never that simple. But, these are the areas that I have found to bring most performance gains. Others include repartitioning/coalescing the data and of course unpersisting cache throughout the script where possible.
I’ll follow up with future articles to cover off more performance tuning ideas in more detail
For now, happy tuning!