Close this search box.

Handling Nested JSON data in PySpark

What is JSON?

JSON, or JavaScript Object Notation, is a lightweight data-interchange format for exchanging data between different systems. It is language-independent, meaning it can be used across multiple programming languages. JSON is an effective way to store and exchange data, as it is easy to read, write and parse, allowing for a much faster data exchange than other formats. It is also used in web applications to transfer data between a server and a client, making it a key part of web development. JSON is a great way to send data because it is lightweight and can be easily serialized and deserialized (transformed between data formats). This makes it a great choice for transferring data between applications and services.

What is Nested JSON?

Nested JSON is a type of data structure where an object contains multiple nested objects within it. It is commonly used to store data or to transmit data between applications. It can be used to represent complex data structures such as hierarchical data or object-oriented data. Each nested object can also contain its own set of objects, making nested JSON a very powerful way to store large amounts of information.

In our example data (from the New York state, available here), we have 2 components to our JSON document. The first is called ‘meta’ – which includes tonnes of information related to the dataset and we have ‘data’ which is the payload (the actual data).

A snippet from Meta:

A snippet from Data:

How do we deal with this in PySpark?

First, if we are using Google Colab to test this, then we can use the below script to configure Google Colab to run PySpark.

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

Now we have our Spark session, we need to grab our JSON data.

from urllib.request import urlopen
jsonData = urlopen("").read().decode('utf-8')

Next, we need to get that data ready to be read into a dataframe in Spark. The parallelize function creates an RDD from the Python object.

Note: An RDD (Resilient Distributed Dataset) is the fundamental data structure of PySpark. It is an immutable distributed collection of objects. Each dataset in an RDD is divided into logical partitions which may be computed on different nodes of the cluster. RDDs can be created through parallelizing existing collections of data or by transforming other RDDs.

rdd = spark.sparkContext.parallelize([jsonData])

We can now read this data into a dataframe:

df =,multiLine=True)

The problem is, it’ll look something like the below, which is really unusable.

So we need to do a little bit more work to make this usable – we need to define a custom schema for our ‘data’ field as PySpark may not be able to infer it correctly. We then need to make a dataframe from only the data field, using the schema we have provided.

The collect() statement retrieves all elements of the data column as an array of RowType. We can then access the ‘data’ key within the resulting array.

from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([ \
    StructField("sid",StringType(),True), \
    StructField("id",StringType(),True), \
    StructField("position",StringType(),True), \
    StructField("created_at", StringType(), True), \
    StructField("created_meta", StringType(), True), \
    StructField("updated_at", StringType(), True), \
    StructField("updated_meta", StringType(), True), \
    StructField("meta", StringType(), True), \
    StructField("year", StringType(), True), \
    StructField("first_name", StringType(), True), \
    StructField("county", StringType(), True), \
    StructField("sex", StringType(), True), \
    StructField("count", StringType(), True) 
data = spark.createDataFrame("data").collect()[0]['data'], schema)

You’re now in a position that your data is in the dataframe, ready to use!

Can AI generate this code?

Google Bard has suggested the below alternative to what we have done above. Although interesting, neither of these pieces of code would run without IOPub errors – which was not the case with our code above.

This line of code reads the JSON file jsonData into a Spark DataFrame using the read.json method. The option("key-mapper", lambda x: x["data"].keys()).take(1) option specifies that the DataFrame should only contain the keys of the data object in each row. The take(1) option specifies that the DataFrame should only contain the first row.

The read.json method takes a number of options, which can be used to control how the JSON file is read. Some of the most common options include:

  • path: The path to the JSON file.
  • format: The format of the JSON file.
  • header: Whether or not to read a header from the JSON file.
  • key-mapper: A function that maps each key in the JSON file to a new key.
  • value-mapper: A function that maps each value in the JSON file to a new value.
  • take: The number of rows to read from the JSON file.

The key-mapper and value-mapper options can be used to customize how the JSON file is read. For example, the key-mapper option can be used to extract only the keys of the data object in each row. The value-mapper option can be used to convert the values in each row to a different type.

The take option can be used to limit the number of rows that are read from the JSON file. For example, if the take option is set to 1, only the first row will be read.

from pyspark.sql.functions import key_mapper
# Read the JSON file into a DataFrame
df =
# Use the key_mapper function to map the keys of the data object to new keys
df = df.withColumn("keys", key_mapper(df["data"].keys()))
# Take the first row
df = df.take(1)

It also proposed:

from pyspark.sql.functions import map
# Read the JSON file into a DataFrame
df =
# Use the map function to map the keys
df = df.withColumn("new_keys", map(df["data"].keys, lambda x: x))
# Take the first row
df = df.take(1)

For reference, in Spark, a key-mapper is a function that maps each key in a RDD of key-value pairs to a new key. The new key is then used to partition the RDD and distribute the data across multiple executors.

The key-mapper function is defined by the user and can be any function that takes a key and returns a new key. The new key must be of the same type as the original key.

The key-mapper function is used in a number of Spark operations, including:

  • Map: The map operation applies a function to each key-value pair in a RDD and returns a new RDD with the same number of partitions. The new RDD is partitioned by the new key.
  • Filter: The filter operation applies a predicate to each key-value pair in a RDD and returns a new RDD with the same number of partitions. The new RDD is partitioned by the original key.
  • GroupByKey: The groupByKey operation groups the key-value pairs in a RDD by key and returns a new RDD with the same number of partitions. The new RDD is partitioned by the key.

The key-mapper function is a powerful tool that can be used to improve the performance of Spark applications. By using the key-mapper function, you can partition your data more efficiently and distribute it across multiple executors.

Google Bard did analyze my code & suggest performance improvements.

All that is great but I think only the top option is valid. Yes, we could upgrade Spark version but using a different data format for ingest is a strange suggestion – usually we are ingesting data from JSON because that’s the format it is in at source. Here are a few things you can do:

  • Use a faster JSON parser. The urllib.request.urlopen function is not the fastest way to parse JSON data. You can use a faster library like json or fastjson.
  • Use a faster Spark version. The latest version of Spark is faster than the previous version. You can upgrade to the latest version of Spark to improve the efficiency of your code.
  • Use a better data format. The JSON format is not the best format for storing data in Spark. You can use a better format like Parquet or ORC to improve the efficiency of your code.

Here is an example of how you can use a faster JSON parser:

import fastjson as json
data = json.loads(jsonData)
# Create a DataFrame from the data
df =
Share the Post:

Related Posts