Apache Spark provides us with a framework to crunch a huge amount of data efficiently by leveraging parallelism which is great! However, with great power, comes great responsibility; because, optimising your scripts to run efficiently, is not so easy.
Within our scripts, we need to look to minimize the data we bring in; avoid UDF’s where possible and seek to cache/persist dataframes which will be used over and over again. I’ve discussed this in detail here. Sometimes though, even with these optimizations, your script may take a very long time to execute and sometimes, it may fail due to memory, timeout or other errors.
This article will take you through some of the configuration changes we can make when we submit our jobs to ensure they run in a timely fashion and mitigate the risk of error.
It’s likely that if you are reading this article, you’ve already done some development in Spark. If you haven’t you can take my free course here to learn the core concepts of the language. Assuming you have some experience, the below command should look familiar to you.
Here, we are calling the spark-submit command to run our .py file. The logs are output to stdout.log and stderr.log, so we don’t end up with a 10,000 line terminal output.
spark-submit yourfile.py >stdout.log 2>stderr.log
So, you’ve run your job using the above and it’s either erroring due to platform constraints or it’s taking a long time. What can we do to improve it?
To understand that, we must first understand the components of Spark. The below image (from the Apache website), outlines the key components.
The Driver is the process running the main() function from within your Spark application and it’s responsible for creating the Spark Context/Spark Session.
The Cluster manager, in most caes is YARN, which allocates resources to the application run by the driver.
The worker node is the node/server which runs your application code across the cluster.
The executor is the process running on the worker node, which runs our tasks – it can keep the data in memory/on disk.
Set Driver Memory
To improve performance, we can tune parameters associated to each one of these components. First off, let’s look at how we can tune the CPU and memory assigned to each component. In the below, I have tuned the amount of memory assigned to the driver; i’ve set it to 10GB.
spark-submit --driver-memory 10g filename.py >stdout.log 2>stderr.log
Set Executor Memory
This may not help a lot, so let’s now provide the executor with more memory – remember, this component can keep our data in memory, so the more you provide, the more performant the job should be. In short, the memory controls how much data Spark can cache. Every executor will be assigned the same, fixed amount of memory on execution. In the below, we have now assigned out driver 10GB of memory and each executor 150GB.
spark-submit --driver-memory 10g --executor-memory 15g filename.py >stdout.log 2>stderr.log
Set Executor Cores
Still, we may find performance problems. We can go further. In the below, I have set the executor cores to 2. Each executor will now have 2 cores assigned. The number of cores controls the number of concurrent tasks an executor can run. So, this would mean, the executor can run a maximum of 2 tasks at the same time.
spark-submit --driver-memory 10g --executor-memory 15g --executor-cores 2 filename.py >stdout.log 2>stderr.log
Control the number of executors
The big question is, how many executors are you going to need? Well, we can set the dynamic allocation property for our job. This enables Spark to request more exeuctors when there is a backlog of tasks and release those executors when they are idle. This helps us to maximize the performance of our jobs, while managing the overall performance of the cluster.
spark-submit --conf "spark.dynamicAllocation.enabled=true" --driver-memory 10g --executor-memory 15g --executor-cores 2 filename.py >stdout.log 2>stderr.log
Control Serialization Approach
Often, we store our data in arrays, tables, trees, classes and other data structures, which are inefficient to transport. So, we serialise them – that is, we collapse the data into a series of bytes which contain just enough information to reconstruct the data in its original form when it reaches its destination.
We generally serialise data when we:
- Persist our data to a file
- Store our data in databases
- Transfer data through a network (e.g. to other nodes in a cluster)
Back in the 1990’s, there was a move away from a byte stream of data*, to make data serialisation both human readable and programming language agnostic – which is a real benefit when you communicate data between two systems that operate on different programming languages. Formats we could use include: XML, CSV and JSON.
These kind of formats, although human readable are less compact than a byte stream, but in general, that isn’t a huge issue as compute power is so cheap these days.
You can find out more in my article here.
spark-submit --conf "spark.dynamicAllocation.enabled=true" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --driver-memory 10g --executor-memory 15g --executor-cores 2 filename.py >stdout.log 2>stderr.log
In some situations you may also need to tune network timeout parameters, but these will be obvious based on the error message you receive.
As a final note, I will mention that you can embed the config directly into the Spark job if you wish (instead of in the spark-submit command), in the below fashion.
def create_session(appname): spark_session = SparkSession\ .builder\ .config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)\ .config(“spark.kryo.registrationRequired”, “false”)\ .appName(appname)\ .master(‘yarn’)\ .enableHiveSupport()\ .getOrCreate() return spark_session