In this article I am going to cover how to create a simple streaming log monitor using Spark Streaming. In subsequent posts, I’ll go deeper on some of these aspects.
The first thing we need to do, as always is to create a Spark Session. There isn’t much new here, except that we import the StreamingContext and we also setup a Spark Streaming Context (ssc).
In the ssc definition, we have ssc = StreamingContext(spark, 1) – the 1 here means that the files within the directory we’re going to watch will be batched every 1 second. That means, how ever many files arrive within a 1 second window will be processed in a batch.
Next, we’re going to read in our log files. Here, as we would for a normal dataframe, we define a schema. We then define the dataframe as reading a stream of text files with that schema.
In the below, I also create a new field called filename, which will be the full path of the log file, so we can identify exactly which log reported the error.
Next, I apply my transformations. In this case, it’s a simple UDF, which says ‘if the log file includes this text THEN return a specific message’. This will help us extract the most common errors from the logs.
Finally, we define the output directory and format that we require for our stream. This is going to write the dataframe as a CSV.
Note the checkpoint directory – this stores the state of the stream (i.e. which files it’s already processed.
A production-grade streaming application must have robust failure handling. In Structured Streaming, if you enable checkpointing for a streaming query, then you can restart the query after a failure and the restarted query will continue where the failed one left off, while ensuring fault tolerance and data consistency guarantees. Hence, to make your queries fault tolerant, you must enable query checkpointing and configure jobs restart your queries automatically after a failure.Databricks Website
The output mode could be complete, update or append:
- Append: New rows will be written to your sink (output directory)
- Complete: all rows written every time there is an update
- Update: Only updated rows are added to the sink
We require a query name, which is the name for the in-memory table that Spark produces.
Finally, we have some format options: csv, text, parquet, json, console (which can be used for debugging, memory (if you want to store as in memory table).
That’s a quick walk through a Spark Streaming script. In later articles, I’ll cover what happens under the hood.