Python Spark Tutorial – How to use Spark Session and Context to create Spark DataFrame in OOPs way?

How to use Spark Session and Context to create Spark DataFrame in OOPs way_

Spark Dataframe creation can be categorized under three ways

  1. Creating Spark Dataframe using List of Dictionary
  2. Creating Spark Dataframe using RDD
  3. Creating Spark Dataframe from Various Sources.

In this post we will focus on creating spark dataframe using first two category. Before that lets first create 

  1. Spark Session and
  2. Spark Context (For RDD)

For that we are creating a file spark_session_context.py, which we will be importing to the main file. So here we created the spark session object with master node as “Local” and appName as “FirstProgram”

import pyspark
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession.builder\
                    .master("local")\
                    .appName('Firstprogram')\
                    .getOrCreate()
sc = spark.sparkContext

Now lets created another file where we will import the session and context file and create dataframe

from spark_session_context import *

CONCVERT_TYPE = ['dict_to_df', 'rdd_to_df', 'source_to_df' ]
class CreateSparkDataFrame:
    def __init__(self):
        self.dictionary_list = [{"Type": 'X', "Code": 5, "Amount": 221.87, "Validity": True},
                          {"Type": 'Y', "Code": 6, "Amount": 150.75, "Validity": False},
                          {"Type": 'Z', "Code": 7, "Amount": 5.25, "Validity": None},
                          {"Type": 'W', "Code": 8, "Amount": 77.42, "Validity": True}
                          ]
        self.source_type = ['txt', 'csv', 'json', 'xml']

    def convert_to_dictionary(self, type):
        if type not in CONCVERT_TYPE:
            raise Exception(f"the type should be among one of this {CONCVERT_TYPE}")

        if type == 'dict_to_df':
            return self.convert_dict_to_df(self.dictionary_list)
        elif type == 'rdd_to_df':
            return self.convert_rdd_to_df(self.dictionary_list)
        elif type == 'source_to_df':
            return self.convert_source_to_df(self.source_type)

Let’s see how we approached to the solution:

  • Step1) So here first we import the session and context object by calling the import spark_session_context module.
  • Step2) There after we create a class with a constructor which defines a list of dictionaries which we will be using for creating dataframe and RDD
  • Step3) Then we create a method convert_to_dictionary(self, type), which takes the string parameter defining the type of method to call for creating spark dataframe.
  • Step4) It then evaluate the type to call the appropriate method to create data frame.

Methods which convert list to spark datafram:

  1. List Dictionary to Spark Dataframe
  2. List of Dictionary to RDD and then to Spark Dataframe

Below method use the spark session object and call the createDataFrame method and pass the dictionary list object to create the spark dataframe.

def convert_dict_to_df(self, dict_list):
    df = spark.createDataFrame(dict_list)
    return df

Below method uses the context object to first create the spark dataframe. Here it uses spark context to first create RDD then it uses RDD method toDF to convert RDD to spark dataframe.

def convert_rdd_to_df(self, dict_list):
    rdd = sc.parallelize(dict_list)
    df = rdd.toDF()
    return df

Full Code:

from spark_session_context import *

CONCVERT_TYPE = ['dict_to_df', 'rdd_to_df', 'source_to_df']


class CreateSparkDataFrame:
    def __init__(self):
        self.dictionary_list = [{"Type": 'X', "Code": 5, "Amount": 221.87, "Validity": True},
                                {"Type": 'Y', "Code": 6, "Amount": 150.75, "Validity": False},
                                {"Type": 'Z', "Code": 7, "Amount": 5.25, "Validity": None},
                                {"Type": 'W', "Code": 8, "Amount": 77.42, "Validity": True}
                                ]
        self.source_type = ['txt', 'csv', 'dat', 'json', 'xml']

    def convert_to_dictionary(self, type):
        if type not in CONCVERT_TYPE:
            raise Exception(f"the type should be among one of this {CONCVERT_TYPE}")

        if type == 'dict_to_df':
            return self.convert_dict_to_df(self.dictionary_list)
        elif type == 'rdd_to_df':
            return self.convert_rdd_to_df(self.dictionary_list)
        elif type == 'source_to_df':
            return self.convert_source_to_df(self.source_type)

    def convert_dict_to_df(self, dict_list):
        df = spark.createDataFrame(dict_list)
        return df

    def convert_rdd_to_df(self, dict_list):
        rdd = sc.parallelize(dict_list)
        df = rdd.toDF()
        return df

if __name__ == '__main__':
    obj_create_df = CreateSparkDataFrame()
    df_from_dictionary = obj_create_df.convert_to_dictionary('dict_to_df')
    df_from_dictionary.show()

    df_from_rdd = obj_create_df.convert_to_dictionary('rdd_to_df')
    df_from_rdd.show()

In the next post we will see, how to create dataframe using various sources like

  • Csv
  • Xml
  • Json

Till then Happy Coding 

Leave a Reply

Your email address will not be published. Required fields are marked *