Fault Tolerant Friday: Creating robust pipelines in Python

When you’re dealing with tonnes of different datasources, issues crop up all the time: the datasource hasn’t been populated; the schema has changed; the schedule changes or the file format changes.

The key with our jobs is to build in enough checks to at the very least warn us if something fails, so we can proactively rectify the issue, rather than waiting until we have some disgruntled end users bringing the issue to our attention.

There is no data in the source

I cannot tell you how frequently this happens. My script runs and there are no errors in the logs but there is also no data in my data sink (the output location). Why? because the script queried the source, which returned 0 rows. Hence, the fact that my script producing no output was not an error, it succeeded based on the data it was given.

So clearly having no errors in this situation is not ideal. It’s important to handle these scenarios appropriately. Let’s look at an example. Below, I have defined my dataframe & schema.

Next, we’re going to create a variable called ‘rows’ which is simply set to the total row count of our dataframe. We can then build logic in:

  1. If there is data, continue the pipeline
  2. If there is no data take an action

The action you take will be determined by your own requirements. In my jobs, I automate an email to the support team that should manage the jobs that output the (now missing) data; but you might choose to try to re-run that failed job or some other action.

Schema different to expected

The second issue you may face is, the source data schema differing from the schema we have defined. This can happen for a number of reasons but you may find that a currency is sometimes $1000 or 1000. So you need to be able to handle both string and integer values.

You can use Spark’s infer schema option to handle this gracefully. However, I like to be in control of how the data is perceived as it removes the risk of categorizing data incorrectly. Hence my solution for this is simply to ingest everything as a StringType. As you progress through the flow of your ETL pipeline, you can cast values to int or float when required, but having StringType() as the field datatype guarantees that you can ingest the data and mitigates the risk of failure.

Ultimately, there is no right or wrong answer with how you handle these issues. What I try to do with my jobs is provide warning that there has been a failure – if you don’t build this in, you won’t know it’s failed until you have a whole bunch of angry customers chasing you down.