Small files in Apache Spark and Hadoop can cause a huge performance bottleneck, as the processing of these files requires extra computational resources and processing time. Small files are typically those files that are smaller than 128MB in size. While these small files do not take up as much space on a distributed system as large files, they present other issues such as longer execution time and increased I/O requests.
When dealing with small files, Hadoop and Spark must read and store the metadata for each of these files separately. This can be a costly operation, especially when dealing with a large number of small files. The extra time spent processing the metadata can seriously reduce the overall performance of the system.
Another issue with small files is that they create an increase in network traffic between each computing node and the storage system. This leads to an increase in I/O overhead as the work associated with the file-transfer process needs to be completed before the data can be processed.
In addition, small files also cause an imbalance in resource utilization. Since the resources used to process a single file are the same regardless of the file size, small files require the same resources as large files. This means that the processing power is spread out over more files, leading to an overall decrease in efficiency.
To address this issue of small files in Apache Spark and Hadoop, organizations should implement strategies such as combining smaller files into larger files, using compression techniques, or using third-party tools such as MapReduce pre-processing. These strategies can help reduce the amount of time spent processing the small files and improve the overall performance of the system.
Resolving Small File Issues
Resolving small file issues in Apache Spark and Hadoop can be a challenging task. The number of small files that are stored in HDFS can quickly become unmanageable and cause performance issues. Fortunately, there are a few ways to optimize small files in Apache Spark and Hadoop.
First, consider compressing the files. This is a straightforward way to reduce the amount of disk space being taken up by small files and also increase performance. Apache Spark and Hadoop both support the common compression formats, such as GZIP, BZIP2, and LZO.
Another approach is to use the Hadoop MapReduce framework to combine multiple small files into larger files. This will reduce the number of files and make them easier to manage.
The last approach is to use Apache Spark’s built-in coalesce and repartition functions. These functions allow you to move smaller files into larger files and will drastically reduce the amount of time spent processing your data.
In the context of Apache Hive
Hive sits on top of HDFS (the Hadoop Distributed File System). It reads the files that reside onto HDFS into a specified schema (column names & types), which we can then query and interact with. One such way we may interact with Hive is using Apache Spark.
When we write to HDFS, we often end up writing tonnes of small files (files less than the default block size of HDFS). The HDFS block size is defaulted to 64MB but in many organizations has been increased to 128MB, writing files under this size can cause problems. This is because HDFS is not designed to handle a lot of small files. When reading in a large number, the platform has to do a lot of hopping between data nodes to retrieve each of the small files, which is extremely inefficient.
When we work with the dataframes API, life is quite simple. We can make sure that we don’t have a large number of small files by running coalesce or repartition on the dataframe – resulting a smaller number of partitions & hence less files being written. The same is not true for Spark SQL.
Here, we need to use a slightly different approach. First, as you can see in the top script, we set the spark.sql.shuffle.partitions to be equal to 1. This means, after each of the group by operations, you will have a single partition.
spark = SparkSession\
.builder\
.appName('fixing_spark_small_files')\
.config("spark.sql.shuffle.partitions",1)\
.master('yarn')\
.enableHiveSupport()\
.getOrCreate()
dates = [yday_date]
Next, we use the distribute by function in the actual query. In this example, I have created a field called dist, which has a static value in it. That means, it’s the same for every single row in the dataframe. When I distribute by that field, it will send all records to the same reducer – leaving us with a single partition of data.
df.registerTempTable('output')
query = spark.sql('''insert overwrite table database.table partition(date = '''+ date +''', hour = ''' + hour + ''') select * from ( Select *, static_value' as dist from database.table)t1 distribute by dist''')
This does in effect repartition your data, moving it all to a single reducer, writing a single file as output. This will make subsquent queries against your newly written data more efficient.