Managing small file issues when writing to Hive with Spark SQL

Hive sits on top of HDFS (the Hadoop Distributed File System). It reads the files that reside onto HDFS into a specified schema (column names & types), which we can then query and interact with. One such way we may interact with Hive is using Apache Spark.

When we write to HDFS, we often end up writing tonnes of small files (files less than the default block size of HDFS). The HDFS block size is defaulted to 64MB but in many organizations has been increased to 128MB, writing files under this size can cause problems. This is because HDFS is not designed to handle a lot of small files. When reading in a large number, the platform has to do a lot of hopping between data nodes to retrieve each of the small files, which is extremely inefficient.

When we work with the dataframes API, life is quite simple. We can make sure that we don’t have a large number of small files by running coalesce or repartition on the dataframe – resulting a smaller number of partitions & hence less files being written. The same is not true for Spark SQL.

Here, we need to use a slightly different approach. First, as you can see in the top script, we set the spark.sql.shuffle.partitions to be equal to 1. This means, after each of the group by operations, you will have a single partition.

spark = SparkSession\
.builder\
.appName('fixing_spark_small_files')\
.config("spark.sql.shuffle.partitions",1)\
.master('yarn')\
.enableHiveSupport()\
.getOrCreate()
dates = [yday_date]

Next, we use the distribute by function in the actual query. In this example, I have created a field called dist, which has a static value in it. That means, it’s the same for every single row in the dataframe. When I distribute by that field, it will send all records to the same reducer – leaving us with a single partition of data.

df..registerTempTable('output')
   
query = spark.sql('''insert overwrite table database.table partition(date = '''+ date +''', hour = ''' + hour + ''')  select * from ( Select  *, static_value' as dist from database.table)t1 distribute by dist''')

This does in effect repartition your data, moving it all to a single reducer, writing a single file as output. This will make subsquent queries against your newly written data more efficient.

Kodey