Search
Close this search box.

Spark Saturdays: Spark Structured Streaming, Data Sources and Sinks

In Spark Streaming, we have the concept of sources and sinks. A source is very much what you would expect it to be – it’s the source of the data we will be ingesting into our stream. A sink, is the destination for that data in the L part of the ETL process.

Sources

Looking at file based sources (e.g. csv, text, json, parquet, etc..) we define a source, we must of course define the path to the directory in which we will find the files to ingest. When you do that, remember that you need to define the file as either hdfs:///filepath or file:///filepath. If you are running your script on a standalone server and not reading from HDFS, then you should use file:/// to tell the script to check the local file system, rather than heading off to HDFS, where undoubtedly your files won’t exist.

We can use glob paths for our path too. This means we can use widcards in the directory definition.

Now we’ve identified the path to our data, we need to define the data type. This is simply csv, json, text, parquet, orc or other supported file format.

We can optionally set the latestFirst option, which processes the newest files first – which is useful if you have a whole load of backlog that needs processing. You can process new files first to get the insight and gradually work backwards to populate the full history.

Usually, when we bring data into Spark dataframes, we can rely on its schema inference. It’s not always 100% correct on datatypes, but it does make life that little bit easier. By default, this is switched off in Spark Streaming – that is to help make sure that we have a consistent schema applied to data that is constantly flowing in. We can however re enable the inference by setting spark.sql.streaming.schemaInference to True.

Sinks

In our case, our sink could be a local directory, an HDFS directory or the directory on HDFS that corresponds to a Hive table. When we output, we need to define the file type – as above, this could be csv, text, json, parquet, orc, etc…

 We then set an output mode, which could be complete, update or append:

  • Append: New rows will be written to your sink (output directory)
  • Complete: all rows written every time there is an update
  • Update: Only updated rows are added to the sink

We require a query name, which is the name for the in-memory table that Spark produces and we require a checkpoint location. Read more about checkpointing here.

Share the Post:

Related Posts