Overcoming Futures Timeout & Read Timeout errors in PySpark

This article covers a less than orthodox method for handling resource constraints in your PySpark code. Consider the below scenario. Here, we are loading data from four sources doing some joins & aggregations and producing an output.

The problem is, we keep getting timeout errors because the data is just so large. After tuning the Spark parameters, we still can’t get the script to run. So what do we do?

Overcoming Futures Timeout & Read Timeout errors in PySpark

Well, the data in source 1 and source 2 is partitioned by date. We need to process 30 days worth of data; but can’t do it in one shot. Here, we can use a good old fashioned loop.

In the below, I setup my spark session as normal. Within the main class, we loop through a list of dates, processing one date at a time. In the first iteration of the loop we create a new table & populate it with date 1; for each subsequent loop we will append to that table. This has enabled us to overcome timeout errors.

This is not a perfect solution as you will now need to run an additional operation on the output table to aggregate for the month. However, in my experience, this is a good solution in environments where timeout errors cannot be overcome through script tuning or the tuning of spark parameters – discussed here.

i = 1
def create_session(appname):
    spark = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .enableHiveSupport()\
        .getOrCreate()
    return spark
### START MAIN ###
if __name__ == '__main__':
    session_name = 'loop_test'
    spark = create_session(session_name)
    
    for dt in dates:
        dt = dt
        source1 = spark.sql("select * from db.table where date = " + dt)
        .... rest of script ....
       
        joined_source.createOrReplaceTempView('temp')
        if i == 1:
            output = spark.sql('create table db.table1 as select * from temp')
        else:
            output = spark.sql('insert into db.table1 select * from temp')
        i = i+1
Share the Post:

Related Posts