Big Data
PySpark
           Instructor: Trong-Hop Do
                             April 24th 2021
               S3Lab
                 Smart Software System
               Laboratory
                                           1
      “Big data is at the foundation of all the
    megatrends that are happening today, from
       social to mobile to cloud to gaming.”
                    – Chris Lynch, Vertica Systems
Big Data                                             2
Install Spark on Windows
                           3
Install Java 8 or Later
● To install Apache Spark on windows, you would need Java 8 or later version hence
   download the Java version from Oracle and install it on your system.
● https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
                                                                                     4
Apache Spark Installation on Windows
● Download Apache spark
● https://spark.apache.org/downloads.html
                                            5
Apache Spark Installation on Windows
● Extract the zip file to any folder
                                       6
Environment Variables Setting
● Open System Environment Variables window and select Environment Variables.
                                                                               7
Environment Variables Setting
                                8
Apache Spark Installation on Windows
                                       9
Apache Spark Installation on Windows
                                       10
Apache Spark Installation on Windows
● Now Edit the PATH variable
                                       11
Apache Spark Installation on Windows
● Add Spark, Java, and Hadoop bin location by selecting New option.
                                                                      12
Test apache Spark shell
                          13
Run PySpark on PySpark Shell
  Type pyspark on command prompt
                                   14
Run PySpark on Jupyter lab
                             15
Run PySpark on Jupyter lab
                             16
Run PySpark on Google Colab
                              17
Big Data Analytics with PySpark SQL
                                      18
What is PySpark
                  19
PySpark Modules and Packages
                               20
RDD vs DataFrame vs DataSet
  In version 2.0, DataSet and DataFrame APIs are unified to provide a single API for developers. A
                                                                                                           21
  DataFrame is a specific Dataset[T], where T=Row type, so DataFrame shares the same methods as Dataset.
RDD vs DataFrame vs DataSet
                              22
What is SparkSession?
● Since Spark 2.0 SparkSession has become an entry point to PySpark to work with
   RDD, DataFrame. Prior to 2.0, SparkContext used to be an entry point.
● Spark Session also includes all the APIs available in different contexts –
     ○   Spark Context,
     ○   SQL Context,
     ○   Streaming Context,
     ○   Hive Context.
                                                                                   23
SparkSession in PySpark shell
● Be default PySpark shell provides “spark” object; which is an instance of
  SparkSession class. We can directly use this object where required in
  spark-shell.
                                                                              24
Create SparkSession in Jupyter lab
                                     25
SparkSession Commonly Used Methods
version – Returns Spark version where your application is running, probably the Spark version you cluster is configured with.
createDataFrame() – This creates a DataFrame from a collection and an RDD
getActiveSession() – returns an active Spark session.
read() – Returns an instance of DataFrameReader class, this is used to read records from csv, parquet, avro and more file formats into DataFrame.
readStream() – Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data into DataFrame.
sparkContext() – Returns a SparkContext.
sql – Returns a DataFrame after executing the SQL mentioned.
sqlContext() – Returns SQLContext.
stop() – Stop the current SparkContext.
table() – Returns a DataFrame of a table or view.
udf() – Creates a PySpark UDF to use it on DataFrame, Dataset, and SQL.
                                                                                                                                                       26
Create RDD using sparkContext.parallelize()
                              By using parallelize() function of SparkContext
                              (sparkContext.parallelize() ) you can create an
                              RDD. This function loads the existing collection
                              from your driver program into parallelizing RDD.
                              This is a basic method to create RDD and used
                              when you already have data in memory that
                              either loaded from a file or from a database. and
                              it required all data to be present on the driver
                              program prior to creating RDD.
                                                                             27
Create RDD using sparkContext.parallelize()
                                              28
Create RDD using sparkContext.textFile()
                                           29
PySpark RDD Operations
● RDD transformations – Transformations are lazy operations, instead of updating
   an RDD, these operations return another RDD.
● RDD actions – operations that trigger computation and return non-RDD values.
● Transformations on PySpark RDD returns another RDD and transformations are
   lazy meaning they don’t execute until you call an action on RDD. Some
   transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey()
   and return new RDD instead of updating the current.
                                                                                         30
 RDD transformation: flatMap
flatMap – flatMap() transformation flattens the RDD after applying the function and
returns a new RDD. On the below example, first, it splits each record by space in an RDD
and finally flattens it. Resulting RDD consists of a single word on each record.
                                                                                           31
RDD transformation: map
map – map() transformation is used the apply any complex operations like adding a
column, updating a column e.t.c, the output of map transformations would always
have the same number of records as input.
                                                                                    32
RDD transformation: map
                          33
RDD transformation: reduceByKey
reduceByKey – reduceByKey() merges the values for each key with the function
specified. In our example, it reduces the word string by applying the sum function on
value. The result of our RDD contains unique words and their count.
                                                                                        34
RDD transformation: sortByKey
                                35
RDD transformation: filter
                             36
RDD functions
https://spark.apache.org/docs/latest/api/python/reference/pyspark.html
                                                                         37
Exercises
•     Show the ID and all game types played by customers who play “Water Sports”.
Hint: use reduceByKey() to concatenate the game types of each customer IDs and then apply filter(). To remove
duplicate game types for each ID, use distinct() function
•     Other exercises
1.    Show IDs and number of transactions of each customer
2.    Show IDs and number of transactions of each customer, sorted by customer ID
3.    Show IDs and total cost of transactions of each customer, sorted by total cost
4.    Show ID, number of transactions, and total cost for each customer, sorted by customer ID
5.    Show name, number of transactions, and total cost for each customer, sorted by totall cost
6.    Show ID, name, game types played by each customer
7.    Show ID, name, game types of all players who play 5 or more game types
8.    Show name of all distinct players of each game types
9.    Show all game types which don’t have player under 40
10.   Show min, max, average age of players of all game types
                                                                                                                38
Create DataFrame from RDD
                            39
Create DataFrame from RDD
Using toDF() function
                            40
Create DataFrame from RDD
Using createDataFrame() from SparkSession
●   Calling createDataFrame() from SparkSession is another way to create PySpark DataFrame
    manually, it takes a list object as an argument. and chain with toDF() to specify names to the
    columns.
                                                                                                     41
Create DataFrame from List Collection
Using createDataFrame() from SparkSession
                                            42
Create DataFrame from List Collection
 Using createDataFrame() with the Row type
createDataFrame() has another signature in PySpark which takes the collection of Row type and
schema for column names as arguments. To use this first we need to convert our “data” object from
the list to list of Row.
                                                                                                    43
Create DataFrame from List Collection
Create DataFrame with schema
If you wanted to specify the column names along with their data types, you should create the
StructType schema first and then assign this while creating a DataFrame.
                                                                                               44
Create DataFrame from Data sources
Creating DataFrame from CSV
                                     45
Create DataFrame from Data sources
Creating DataFrame from CSV
● Using fully qualified data source name, you can alternatively do the following.
                                                                                    46
Create DataFrame from Data sources
Creating DataFrame from CSV - Using Header Record For Column Names
                                                                     47
Create DataFrame from Data sources
Creating DataFrame from CSV - Read Multiple CSV Files
● df = spark.read.csv("path1,path2,path3")
                                                        48
Create DataFrame from Data sources
Creating DataFrame from CSV - Read all CSV Files in a Directory
● df = spark.read.csv("Folder path")
                                                                  49
Create DataFrame from Data sources
Creating DataFrame from CSV - Options While Reading CSV File
●   delimiter option is used to specify the column delimiter of the CSV file. By default, it is comma
    (,) character, but can be set to any character like pipe(|), tab (\t), space using this option.
                                                                                                        50
Create DataFrame from Data sources
Creating DataFrame from CSV - Options While Reading CSV File
●    inferSchema: The default value set to this option is False when setting to true it automatically
    infers column types based on the data. Note that, it requires reading the data one more time to
    infer the schema.
                       Why’re all String?
                                                                                                        51
Create DataFrame from Data sources
Creating DataFrame from CSV - Options While Reading CSV File
●   header: This option is used to read the first line of the CSV file as column names. By default the
    value of this option is False , and all column types are assumed to be a string.
                                                                                                         52
Create DataFrame from Data sources
Creating DataFrame from CSV – user specified custom schema
●   We can specify schema by using the schema option belonging to read.csv()
          s = spark.read.schema(user_schema)
●   Where user_schema is a
     ○    pyspark.sql.types.StructType object
     or
     ○    DDL-formatted string
                                                                               53
     Create DataFrame from Data sources
        Creating DataFrame from CSV - StructType custom schema
from pyspark.sql.types import *
schema = StructType() \
   .add("RecordNumber",IntegerType(),True) \
   .add("Zipcode",IntegerType(),True) \
   .add("ZipCodeType",StringType(),True) \
   .add("City",StringType(),True) \
   .add("State",StringType(),True) \
   .add("LocationType",StringType(),True) \
   .add("Lat",DoubleType(),True) \
   .add("Long",DoubleType(),True) \
   .add("Xaxis",IntegerType(),True) \
   .add("Yaxis",DoubleType(),True) \
   .add("Zaxis",DoubleType(),True) \
   .add("WorldRegion",StringType(),True) \
   .add("Country",StringType(),True) \
   .add("LocationText",StringType(),True) \
   .add("Location",StringType(),True) \
   .add("Decommisioned",BooleanType(),True) \
   .add("TaxReturnsFiled",StringType(),True) \
   .add("EstimatedPopulation",IntegerType(),True) \
   .add("TotalWages",IntegerType(),True) \
   .add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv").option("header", True).schema(schema).load("zipcodes.csv")
df_with_schema.printSchema()
                                                                                                       54
  Create DataFrame from Data sources
    Creating DataFrame from CSV – DLL formatted string custom schema
transDF = spark.read.options(delimiter=',').schema('trans_id INT, date STRING, cust_ID INT, amount DOUBLE, game
STRING, equipment STRING, city STRING, state STRING, mode STRING').csv("trans.txt")
transDF.printSchema()
transDF.show()
                                                                                                                  55
Create DataFrame from Data sources
Creating DataFrame from CSV - Write PySpark DataFrame to CSV file-
● Use the write() method of the PySpark DataFrameWriter object to write PySpark DataFrame to a CSV file.
df.write.option("header",True).csv(“newzipcodes")
●   While writing a CSV file you can use several options. for example, header to output the DataFrame column
    names as header record and delimiter to specify the delimiter on the CSV output file.
df2.write.options(header='True', delimiter=',').csv(“newzipcodes")
                                                                                                               56
Create DataFrame from Data sources
Creating DataFrame from CSV - Write PySpark DataFrame to CSV file-
df2.write.mode('overwrite').csv(“newzipcodes")
#you can also use this
df2.write.format("csv").mode('overwrite').save(“newzipcodes")
                                                                     57
Create DataFrame from Data sources
Creating DataFrame from text file
 You can use .text()                But .csv() is still much better
                                                                      58
PySpark dataframe function
Select Columns From DataFrame
transDF = spark.read.options(delimiter=',').schema('trans_id INT, date STRING, cust_id INT, amount DOUBLE, game STRING,
equipment STRING, city STRING, state STRING, mode STRING').csv("trans.txt")
                                                                                                                          59
PySpark dataframe function
PySpark withColumn()
● PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert
    the datatype of an existing column, create a new column, and many more.
●   You can use withColumn() to
      ○   Change DataType using PySpark withColumn()
      ○   Update The Value of an Existing Column
      ○   Create a Column from an Existing
      ○   Add a New Column using withColumn()
      ○   Rename Column Name
                                                                                                              60
PySpark dataframe function
withColumn() - Change DataType
                                 61
PySpark dataframe function
withColumn() - Update The Value of an Existing Column
                                                        62
PySpark dataframe function
withColumn() - Create a Column from an Existing
                                                  63
PySpark dataframe function
withColumn() - Add a New Column
                                  64
PySpark dataframe function
withColumn() - Rename Column Name
                                    65
PySpark dataframe function
 Where Filter Function | Multiple Conditions
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
   (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
   (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
   (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
   (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
   (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
   (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
schema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
    ])),
    StructField('languages', ArrayType(StringType()), True),
    StructField('state', StringType(), True),
    StructField('gender', StringType(), True)
 ])
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)                                            66
PySpark dataframe function
 Where Filter Function | Multiple Conditions
                                               67
PySpark dataframe function
 Where Filter Function | Multiple Conditions
                                               68
PySpark dataframe function
 Where Filter Function | Multiple Conditions
                                               69
PySpark dataframe function
 Where Filter Function | Multiple Conditions
                                               70
PySpark dataframe function
 Where Filter Function | Multiple Conditions
●   You can also filter DataFrame rows by using startswith(), endswith() and contains() methods of Column class.
                                                                                                                   71
PySpark dataframe function
 Where Filter Function | Multiple Conditions
                                               72
PySpark dataframe function
 Where Filter Function | Multiple Conditions
Filter on an Array column
                                               73
PySpark dataframe function
 Where Filter Function | Multiple Conditions
Filtering on Nested Struct columns
                                               74
PySpark dataframe function
 Where Filter Function | Multiple Conditions
How about Where()?
                                               75
PySpark dataframe function
 Get Distinct Rows (By Comparing All Columns)
                                                76
PySpark dataframe function
Distinct of Selected Multiple Columns
                                        77
PySpark dataframe function
Sort()
                             78
    PySpark dataframe function
    groupBy() and aggregate functions
•    groupBy() function is used to collect the identical data into groups on DataFrame and perform
     aggregate functions on the grouped data.
•    Aggregate functions operate on a group of rows and calculate a single return value for every group:
          •   approx_count_distinct    •   grouping              •   skewness
          •   avg                      •   first                 •   stddev
          •   collect_list             •   last                  •   stddev_samp
          •   collect_set              •   kurtosis              •   stddev_pop
          •   countDistinct            •   max                   •   sum
          •   count                    •   min                   •   sumDistinct
                                       •   mean                  •   variance
                                                                                                      79
PySpark dataframe function
groupBy() and aggregate functions
                                    80
PySpark dataframe function
groupBy() and aggregate functions
●   Aggregate function: agg()
                                    81
PySpark dataframe function
groupBy() and aggregate functions
●   Aggregate function: agg()
                                    82
PySpark dataframe function
groupBy() and aggregate functions
●   Many aggregate function can be used only inside agg()
                                                            83
PySpark dataframe function
groupBy() and aggregate functions
●   Many aggregate function can be used only inside agg()
                                                            84
PySpark dataframe function
groupBy() and aggregate functions
●   collect_list() and collect_set()
                                       85
PySpark built-in function
●   https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html
                                                                                 86
PySpark built-in function
●   array_contains()
                            87
PySpark built-in function
●   concat_ws()
                            88
PySpark built-in function
●   split()
                            89
PySpark built-in function
●   size()
                            90
PySpark built-in function
●   element_at()
                            91
PySpark built-in function
●   explode()
                            92
PySpark built-in function
●   substring()
                            93
PySpark column function
● alias()
                          94
PySpark column function
● isin()
                          95
PySpark column function
● cast()
                          96
PySpark column function
                          97
PySpark SQL JOIN
                   A×B
                   Cross
                           98
PySpark SQL JOIN
                   99
PySpark SQL JOIN
                   100
PySpark SQL JOIN
                   101
PySpark SQL JOIN
                   102
PySpark SQL JOIN
                   Left semi join
                                    103
PySpark SQL JOIN
                   104
PySpark SQL JOIN
                   105
PySpark SQL JOIN
                   106
PySpark SQL JOIN
● Cross join returns the cartesian product with another DataFrame.
                                  A×B
                                   Cross
                                                                     107
PySpark function
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html
                                                                             108
PySpark SLQ tutorial 1
                         109
PySpark SLQ tutorial 1
                         110
PySpark SLQ tutorial 1
                         111
PySpark SLQ tutorial 1
                         112
PySpark SLQ tutorial 1
                         113
PySpark SLQ tutorial 1
                         114
PySpark SLQ tutorial 2
                         115
PySpark SLQ tutorial 2
                         116
PySpark SLQ tutorial 2
                         117
PySpark SLQ tutorial 2
                         118
PySpark SLQ tutorial 2
                         119
PySpark SLQ tutorial 2
                         120
PySpark SLQ tutorial 3
                         121
PySpark SLQ tutorial 3
                         122
PySpark SLQ tutorial 3
                         123
PySpark SLQ tutorial 3
                         124
PySpark SLQ tutorial 3
                         125
PySpark SLQ tutorial 3
                         126
PySpark SLQ tutorial 3
                         127
PySpark SLQ tutorial 3
                         128
Run PySpark on Kaggle Notebook
●   Click Code -> click New Notebook
                                       129
Run PySpark on Kaggle Notebook
●   Make sure Internet is turned On on Kaggle Notebook
●   Run !pip install pyspark
                                                         130
  Q&A
           Cảm ơn đã theo dõi
           Chúng tôi hy vọng cùng nhau đi đến thành công.
                                                            131
Big Data