Handling data skew / imbalanced partitions in Pyspark

Data Skew is a real problem in Spark. It seems like the sort of thing which should be solved automatically by the job manager but unfortunately, it isn’t. Skew is where we have a given partition which contains a huge amount more data than others – leaving one executor to process a lot of data, while the others have small datasets. The symptom of imbalanced partitions is where 99/100 of your jobs finish quickly and the final job runs and runs and runs…

In this article, I will talk about a couple of approaches to solving this. This article won’t cover techniques like broadcast joins; this is for the repartitioning of the whole dataset.

Repartitioning

When we have imbalanced partitions, we have a few ways we can handle it. The most straightforward is to repartition the dataset. In the below, I have done so on df1; and partitioned it by Ship Date and Ship Mode. This overwrites the default partitioning and splits it out to a more granular level, across more than one field.

df1 = spark.read.format("csv").option("header", "true").load("sales1.csv").repartition("Ship Date", "Ship Mode")
df.show()

If we look at how the partitions are balanced, you can see it’s pretty good! there is no single partition with a disproportionate amount of data.

from pyspark.sql.functions import spark_partition_id, asc, desc
df\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|        133|   11|
|         99|   16|
|         91|   16|
|        145|   17|
|         76|   17|
|        103|   18|
|         78|   18|
|         44|   22|
|         53|   23|
|         65|   23|
|        175|   24|
|        156|   24|
|        159|   24|
|         74|   24|
|        167|   25|
|        192|   25|
|        158|   25|
|        102|   26|
|        198|   26|
|        164|   27|
+-----------+-----+

In the below example, I have not repartitioned the dataset. It’s the same input data.

df2 = spark.read.format("csv").option("header", "true").load("sales1.csv")
df2.show()

In this instance, it’s all been placed into a single partition. This is because the data is very small & it comfortably fits within a single partition. However, the point of this exercise was to demonstrate 1) how you can repartition your dataframes and 2) how you can easily see the record count per partition.

from pyspark.sql.functions import spark_partition_id, asc, desc
df2\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .show()
+-----------+-----+
|partitionId|count|
+-----------+-----+
|          0| 9994|
+-----------+-----+

Sometimes, your dataset may not have fields which make it easy to partition. For example, you may have a dataset where you have 1,000,000,000 rows per day. There is no hour column and no other granular way to break down your dataset.

In this instance, you could use the datetime stamp from the order 2020/01/01 15:50 and create new columns in your dataset: Year | Month | Day | Hour. You can then partition by these 4 fields to create smaller, balanced partitions – if that isn’t suitable, we can consider salting!

Salting Your Data

If you find that you have unbalanced partitions and you don’t have a particularly suitable key to join on to overcome the issue; we can use salting. Here, we add randomization to our keys to distribute more evenly. Let’s say we have the below data:

StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreA | 109 columns of data...
StoreB | 109 columns of data...
StoreB | 109 columns of data...

Here, we can see that the majority of data is related to store A. So if we were to partition this data by store ID, we would end up with skewed partitions and a disproportionate amount of the processing power would fall onto one executor.

When we salt our data, we create a new column in our dataframe which has a randomized integer, from one to n (n being the number of partitions you require). We can then concatinate this with our key – when we repartition now by key, we do so by our new, salted value.

StoreA1 | 109 columns of data...
StoreA2 | 109 columns of data...
StoreA3 | 109 columns of data...
StoreA4 | 109 columns of data...
StoreA5 | 109 columns of data...
StoreA6 | 109 columns of data...
StoreA7 | 109 columns of data...
StoreA8 | 109 columns of data...
StoreA9 | 109 columns of data...
StoreA10 | 109 columns of data...
StoreB1 | 109 columns of data...
StoreB2 | 109 columns of data...

There are plenty of other ways to solve your skew problem – and by no means are the above a magic solution to your problems. Remember, repartitioning a large dataset is an expensive operation. Both repartitioning and coalesce operations shuffle data around and the movement of that data is slow. These are just two selected approaches to resolving your data skew troubles.

Kodey