Python Spark Tutorial – How to use Spark Session and Context to create Spark DataFrame for CSV, XML and JSON Object in OOPs way?For XML we will use Databrick API?

How to use Spark Session and Context to create Spark DataFrame for CSV, XML and JSON Object in OOPs way_For XML we will use Databrick API_

Problem Statement:

Creating Sparkdataframe using XML and JSON source is not straight forward, like for XML we need an external package “spark-xml” developed by Databrick, similarly for JSON, we can read a formatted JSON file directly, it requires an additional parameter to be passed in the load function.

Solution:

Let’s see how we can read below two file formats to our Spark Dataframe

  1. XML
  2. JSON

For XML, first wee need to download the spark-xml jar file through below link, the version should be equal to the scala version jar file, which you can see from your JAR file folder 

So we can see the Scala version is 2.12, so we need to select spark-xml 2.12 version jar, once downloaded, keep the jar in the JAR directory.

Once this is done lets create spark context and session

For that we are creating a file spark_session_context.py, which we will be importing to the main file. So here we created the spark session object with master node as “Local” and appName as “FirstProgram”

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession.builder\
                    .master("local")\
                    .appName('Firstprogram')\
                    .getOrCreate()
sc = spark.sparkContext

Now we have one sample xml file, which we will try to ready with spark read method

<?xml version="1.0"?>
<catalog>
   <book id="b001">
      <author>Harsha, N</author>
      <title>Python Developers Guide</title>
      <genre>Computer</genre>
      <price>3000</price>
      <publish_date>2021-10-17</publish_date>
      <description>Comprehensive Guide to Python Devloper
      with Code.</description>
   </book>
   <book id="boo2">
      <author>Vikram, M</author>
      <title>Python API Development</title>
      <genre>Computer</genre>
      <price>1000</price>
      <publish_date>2021-12-01</publish_date>
      <description>Learn and code Python API using FastAPI
      </description>
   </book>

Here we are writing a method to read the above XML, for reading we define row tag which means from where the rows are getting started, in above example we have book tag which defines the rows so our rowTag = ‘book’. Below is the snippet 

m_xml(self, file_name):
    file = os.path.join('SourceFiles', file_name)
    df = spark.read \
        .format('com.databricks.spark.xml') \
        .options(rowTag='book') \
        .load(file)
    return df

now after submitting the job, we get result like below:

JSON to PySpark Dataframe, converting json data to pyspark data frame is very simple, only thing which need  to keep in mind is the “named _corrupt_record by default” which comes because Spark needs specific format of JSON to be read properly, either each line of JSON file should be a self-contained json object or the entire json should be in one line.

Here is my data which we will convert to spark dataframe

{
    "fruit": "Apple",
    "size": "Large",
    "color": "Red"
}

Below is the code snippet to convert JSON to spark dataframe

def read_data_from_json(self, file_name):
    file = os.path.join('SourceFiles', file_name)
    df = spark.read.option("multiline","true").json(file)
    return df

Output of the above of code block:

CSV to Spark Dataframe, In case of csv file we have Spark.read.csv method which takes input parameter AS file name, you can also give the delimiter and if you have the headers in your csv file that also we can mention it as a parameter in the function. Below is a code snippet which will be reading a source file which is in a CSV format, having the header, so we use the parameter header=”true”. Below is that codes Snippet

def read_data_from_csv(self, file_name):
    file = os.path.join('SourceFiles', file_name)
    df = spark.read.csv(file, sep='|', header=True)
    # df = spark.read.option('header', 'true').csv(file)
    return df

This is our csv file with few line of data

Full Program:

from spark_session_context import *
import os
script_dir = os.path.dirname(__file__)

CONVERT_TYPE = ['dict_to_df', 'rdd_to_df', 'source_to_df']
# SOURCE_FILEPATH = script_dir + r'/SourceFiles/'


class CreateSparkDataFrame:
    def __init__(self):
        self.dictionary_list = [{"Type": 'X', "Code": 5, "Amount": 221.87, "Validity": True},
                                {"Type": 'Y', "Code": 6, "Amount": 150.75, "Validity": False},
                                {"Type": 'Z', "Code": 7, "Amount": 5.25, "Validity": None},
                                {"Type": 'W', "Code": 8, "Amount": 77.42, "Validity": True}
                                ]
        self.source_type = ['txt', 'csv', 'dat', 'json', 'xml']

    def convert_to_dictionary(self, type, source_type=None, file_name=None):
        if type not in CONVERT_TYPE:
            raise Exception(f"the type should be among one of this {CONVERT_TYPE}")

        if type == 'dict_to_df':
            return self.convert_dict_to_df(self.dictionary_list)
        elif type == 'rdd_to_df':
            return self.convert_rdd_to_df(self.dictionary_list)
        elif type == 'source_to_df':
            return self.convert_source_to_df(source_type, file_name)

    def convert_dict_to_df(self, dict_list):
        df = spark.createDataFrame(dict_list)
        return df

    def convert_rdd_to_df(self, dict_list):
        rdd = sc.parallelize(dict_list)
        df = rdd.toDF()
        return df

    def convert_source_to_df(self, source_type, file_name):
        if source_type not in self.source_type:
            raise Exception(f"Source supported are {self.source_type}")
        if source_type == 'csv':
            df = self.read_data_from_csv(file_name)
        elif source_type == 'txt':
            df = self.read_data_from_txt(file_name)
        elif source_type == 'dat':
            df = self.read_data_from_dat(file_name)
        elif source_type == 'json':
            df = self.read_data_from_json(file_name)
        elif source_type == 'xml':
            df = self.read_data_from_xml(file_name)
        else:
            df = None
        return df

    def read_data_from_csv(self, file_name):
        file = os.path.join('SourceFiles', file_name)
        df = spark.read.csv(file, sep='|', header=True)
        # df = spark.read.option('header', 'true').csv(file)
        return df

    def read_data_from_json(self, file_name):
        file = os.path.join('SourceFiles', file_name)
        df = spark.read.option("multiline","true").json(file)
        return df

    def read_data_from_xml(self, file_name):
        file = os.path.join('SourceFiles', file_name)
        df = spark.read \
            .format('com.databricks.spark.xml') \
            .options(rowTag='book') \
            .load(file)
        return df


if __name__ == '__main__':
    obj_create_df = CreateSparkDataFrame()
    df_from_dictionary = obj_create_df.convert_to_dictionary('dict_to_df')
    df_from_dictionary.show()

    df_from_rdd = obj_create_df.convert_to_dictionary('rdd_to_df')
    df_from_rdd.show()

    file_name = 'data1.csv'
    df_from_source = obj_create_df.convert_to_dictionary('source_to_df', 'csv', file_name)
    df_from_source.show()

    file_name = 'data2.json'
    df_from_source = obj_create_df.convert_to_dictionary('source_to_df', 'json', file_name)
    df_from_source.show()

    file_name = 'data3.xml'
    df_from_source = obj_create_df.convert_to_dictionary('source_to_df', 'xml', file_name)
    df_from_source.show()

Till next post Happy Day 

Leave a Reply

Your email address will not be published. Required fields are marked *