What is JSON?
What is Nested JSON?
Nested JSON is a type of data structure where an object contains multiple nested objects within it. It is commonly used to store data or to transmit data between applications. It can be used to represent complex data structures such as hierarchical data or object-oriented data. Each nested object can also contain its own set of objects, making nested JSON a very powerful way to store large amounts of information.
In our example data (from the New York state, available here), we have 2 components to our JSON document. The first is called ‘meta’ – which includes tonnes of information related to the dataset and we have ‘data’ which is the payload (the actual data).
A snippet from Meta:
A snippet from Data:
How do we deal with this in PySpark?
First, if we are using Google Colab to test this, then we can use the below script to configure Google Colab to run PySpark.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz !tar xf spark-3.1.1-bin-hadoop3.2.tgz !pip install -q findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2" import findspark findspark.init() from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[*]").getOrCreate() spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better spark
Now we have our Spark session, we need to grab our JSON data.
from urllib.request import urlopen jsonData = urlopen("https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json").read().decode('utf-8')
Next, we need to get that data ready to be read into a dataframe in Spark. The parallelize function creates an RDD from the Python object.
Note: An RDD (Resilient Distributed Dataset) is the fundamental data structure of PySpark. It is an immutable distributed collection of objects. Each dataset in an RDD is divided into logical partitions which may be computed on different nodes of the cluster. RDDs can be created through parallelizing existing collections of data or by transforming other RDDs.
rdd = spark.sparkContext.parallelize([jsonData])
We can now read this data into a dataframe:
df = spark.read.json(rdd,multiLine=True) df.show(5)
The problem is, it’ll look something like the below, which is really unusable.
So we need to do a little bit more work to make this usable – we need to define a custom schema for our ‘data’ field as PySpark may not be able to infer it correctly. We then need to make a dataframe from only the data field, using the schema we have provided.
The collect() statement retrieves all elements of the data column as an array of RowType. We can then access the ‘data’ key within the resulting array.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType schema = StructType([ \ StructField("sid",StringType(),True), \ StructField("id",StringType(),True), \ StructField("position",StringType(),True), \ StructField("created_at", StringType(), True), \ StructField("created_meta", StringType(), True), \ StructField("updated_at", StringType(), True), \ StructField("updated_meta", StringType(), True), \ StructField("meta", StringType(), True), \ StructField("year", StringType(), True), \ StructField("first_name", StringType(), True), \ StructField("county", StringType(), True), \ StructField("sex", StringType(), True), \ StructField("count", StringType(), True) ]) data = spark.createDataFrame(df.select("data").collect()['data'], schema)
You’re now in a position that your data is in the dataframe, ready to use!
Can AI generate this code?
Google Bard has suggested the below alternative to what we have done above. Although interesting, neither of these pieces of code would run without IOPub errors – which was not the case with our code above.
This line of code reads the JSON file
jsonData into a Spark DataFrame using the
read.json method. The
option("key-mapper", lambda x: x["data"].keys()).take(1) option specifies that the DataFrame should only contain the keys of the
data object in each row. The
take(1) option specifies that the DataFrame should only contain the first row.
read.json method takes a number of options, which can be used to control how the JSON file is read. Some of the most common options include:
path: The path to the JSON file.
format: The format of the JSON file.
header: Whether or not to read a header from the JSON file.
key-mapper: A function that maps each key in the JSON file to a new key.
value-mapper: A function that maps each value in the JSON file to a new value.
take: The number of rows to read from the JSON file.
value-mapper options can be used to customize how the JSON file is read. For example, the
key-mapper option can be used to extract only the keys of the
data object in each row. The
value-mapper option can be used to convert the values in each row to a different type.
take option can be used to limit the number of rows that are read from the JSON file. For example, if the
take option is set to 1, only the first row will be read.
from pyspark.sql.functions import key_mapper # Read the JSON file into a DataFrame df = spark.read.json(jsonData) # Use the key_mapper function to map the keys of the data object to new keys df = df.withColumn("keys", key_mapper(df["data"].keys())) # Take the first row df = df.take(1)
It also proposed:
from pyspark.sql.functions import map # Read the JSON file into a DataFrame df = spark.read.json(jsonData) # Use the map function to map the keys df = df.withColumn("new_keys", map(df["data"].keys, lambda x: x)) # Take the first row df = df.take(1)
For reference, in Spark, a key-mapper is a function that maps each key in a RDD of key-value pairs to a new key. The new key is then used to partition the RDD and distribute the data across multiple executors.
The key-mapper function is defined by the user and can be any function that takes a key and returns a new key. The new key must be of the same type as the original key.
The key-mapper function is used in a number of Spark operations, including:
- Map: The map operation applies a function to each key-value pair in a RDD and returns a new RDD with the same number of partitions. The new RDD is partitioned by the new key.
- Filter: The filter operation applies a predicate to each key-value pair in a RDD and returns a new RDD with the same number of partitions. The new RDD is partitioned by the original key.
- GroupByKey: The groupByKey operation groups the key-value pairs in a RDD by key and returns a new RDD with the same number of partitions. The new RDD is partitioned by the key.
The key-mapper function is a powerful tool that can be used to improve the performance of Spark applications. By using the key-mapper function, you can partition your data more efficiently and distribute it across multiple executors.
Google Bard did analyze my code & suggest performance improvements.
All that is great but I think only the top option is valid. Yes, we could upgrade Spark version but using a different data format for ingest is a strange suggestion – usually we are ingesting data from JSON because that’s the format it is in at source. Here are a few things you can do:
- Use a faster JSON parser. The
urllib.request.urlopenfunction is not the fastest way to parse JSON data. You can use a faster library like
- Use a faster Spark version. The latest version of Spark is faster than the previous version. You can upgrade to the latest version of Spark to improve the efficiency of your code.
- Use a better data format. The JSON format is not the best format for storing data in Spark. You can use a better format like Parquet or ORC to improve the efficiency of your code.
Here is an example of how you can use a faster JSON parser:
import fastjson as json data = json.loads(jsonData) # Create a DataFrame from the data df = spark.read.json(data)