Python Spark Tutorial – How to add Schema inside schema, nested SchemaType in Spark DataFrame binding?

How-to-add-Schema-inside-schema-nested-SchemaType-in-Spark-DataFrame-binding

Problem: 

Imagine a situation where you need to store information about employees, and each employee may have multiple addresses, such as a temporary address and a permanent address. In this scenario, you are tasked with managing a dataset where the ‘address’ field, containing multiple addresses, must be associated with the primary employee schema. Essentially, the employee’s schema includes another schema, independent yet interconnected – the address schema. This nested structure accommodates the storage of multiple addresses for each individual employee.

Solution:

To address this, we will formulate a distinct address schema that establishes a connection with the address field in the Employee schema. Following this, the Employee schema is integrated into the creation process of the Spark DataFrame.

In the code snippet below, we are crafting a straightforward address schema comprising two attributes: permanent_address and current_address, both of string type, with the nullable property set to true (as denoted by the third parameter).

self.address_schema = StructType([
    StructField("permanent_address", StringType(), True),
    StructField("current_address", StringType(), True),
])

Next, we formulate the employee schema wherein one of the attributes, the address field, is assigned the type of another schema, namely the address schema. 

In the code snippet below, observe that the address field specifies the data type as the address_schema we defined earlier.

# Define the main schema with a nested structure for address
self.employee_schema = StructType([
    StructField("unique_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", self.address_schema, True)
])

Let’s create the data for above schema

self.data = [
    ("A001", 'Harsha', 20, ("Pune", "Mumbai")),
    ("A002", 'Shyam', 26, ("Pune", "Mumbai")),
    ("A003", 'Radha', 22, ("Pune", "Nashik")),
    ("A004", 'Sangeet', 19, ("Ahmedabad", "Mumbai"))
]

In this example, we made a nested structure for organizing information about employees. Each employee gets a special ID, has a name, an age, and an address that includes both a permanent address and a communication address. After setting up this nested schema, we used some above example data to create a dataframe using PySpark that follows this schema.

Full Code:

from spark_session_context import *
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

script_dir = os.path.dirname(__file__)
CONVERT_TYPE = ['raw_data_to_df_with_schema', 'csv_to_df_with_schema', 'json_to_df_with_schema']


class CreateSparkDataFrame:
    def __init__(self):
        # Define the nested structure for address
        self.address_schema = StructType([
            StructField("permanent_address", StringType(), True),
            StructField("current_address", StringType(), True),
        ])

        # Define the main schema with a nested structure for address
        self.employee_schema = StructType([
            StructField("unique_id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True),
            StructField("address", self.address_schema, True)
        ])

        self.data = [
            ("A001", 'Harsha', 20, ("Pune", "Mumbai")),
            ("A002", 'Shyam', 26, ("Pune", "Mumbai")),
            ("A003", 'Radha', 22, ("Pune", "Nashik")),
            ("A004", 'Sangeet', 19, ("Ahmedabad", "Mumbai"))
        ]

    def create_sparkdataframe(self):
        emp_df = spark.createDataFrame(data=self.data, schema=self.employee_schema)
        return emp_df

if __name__ == '__main__':
    emp_obj = CreateSparkDataFrame()
    emp_df = emp_obj.create_sparkdataframe()
    emp_df.show()

Happy Coding !! 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *