Python Spark Tutorial – How to add Schema To DataFrame Using StructFieldNStructType?

How to add Schema To DataFrame Using StructFieldNStructType_

Problem statement:

when you have the raw data which is not consisting of any attribute name and the schema type then if you try to read the data to spark dataframe, the attribute name and datatype will be selected by default. In the solution we will see how to give schema or the bind the metadata to the raw data.

Solution:

In Spark we have structure type class and structure field class which is used to define the schema or the metadata for the raw data where the attribute names are missing so these schemas can be construct with the help of StructType and StructField and then this schema/metadata can be bind with the data while creating the dataframe.

Here we can see we have the raw data, without any schema information like attribute name or datatype.

self.raw_data = [
    ("Neha", "B", "Sharma", "10002", "F", 6000),
    ("Harsha", "", "N", "10005", "F", 4500),
    ("Vikram", "", "M", "10003", "M", 5000),
    ("Ananya", "Reddy", "Verma", "10004", "F", 7000),
    ("Aarav", "Kumar", "Patel", "10001", "M", 5500)
]

So now we will create a metadata for the above raw data, using StructType and StructField Class, these are the class which are present in pyspark.sql.types module, so we need to import these module and classes to use it in our program.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

Code to construct the metadata/schema of the raw data, here we use StructType Class and then use StructField Class to define the attribute name and datatype, and for datatype we have imported various datatype class from pyspark.sql.types module.

# Here we are trying to give the Schema definition to the data
self.schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("middle_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary_in_doller", IntegerType(), True)
])

Once your raw data is ready and schema is ready, it is now time to bind the schema with the raw data while creating the data frame, below is the code which binds the schema to the dataframe. The schema property needs to be set to schema object created in the above code snippet while reading data from various source to form spark data frame.

Here are the three examples, 

  • one with the raw data without and schema bind to it
  • second, with the csv data
  • third with the json file data bind with schema
def convert_raw_data_to_df(self):
    print("Converting Data from list to DF with schema")
    df = spark.createDataFrame(data=self.raw_data, schema=self.schema)
    return df

def convert_csv_to_df(self, file_name):
    print("Converting Data from CSV to DF with schema")
    file = os.path.join('SourceFiles', file_name)
    df = spark.read.csv(file, sep='|', header=True, schema=self.schema)
    return df

def convert_json_to_df(self, file_name):
    print("Converting Data from JSON to DF with schema")
    file = os.path.join('SourceFiles', file_name)
    df = spark.read.schema(self.schema).option("multiline", "true").json(path=file)
    df.printSchema()
    return df

Output: 

Raw data used for CSV is:

Raw data used in Json is:

[
  {
    "first_name": "Neha",
    "middle_name": "B",
    "last_name": "Sharma",
    "unique_id": "10002",
    "gender": "F",
    "salary_in_doller": 6000
  },
  {
    "first_name": "Harsha",
    "middle_name": "",
    "last_name": "N",
    "unique_id": "10005",
    "gender": "F",
    "salary_in_doller": 4500
  },

In the next post we will see the problem that comes when you have schema defined with field name other than in the source feed.

Happy Coding

Leave a Reply

Your email address will not be published. Required fields are marked *