PySpark 3.
0 Quick Reference Guide
What is Apache Spark? PySpark Catalog (spark.catalog) • Distributed Function
‒ forEach()
• Open Source cluster computing framework • cacheTable() ‒ forEachPartition()
• Fully scalable and fault-tolerant • clearCache()
• Simple API’s for Python, SQL, Scala, and R • createTable() PySpark DataFrame Transformations
• Seamless streaming and batch applications • createExternalTable() • Grouped Data
• Built-in libraries for data access, streaming, • currentDatabase ‒ cube()
data integration, graph processing, and • dropTempView() ‒ groupBy()
advanced analytics / machine learning • listDatabases() ‒ pivot()
• listTables() ‒ cogroup()
Spark Terminology • listFunctions() • Stats
• listColumns() ‒ approxQuantile()
• Driver: the local process that manages the isCached()
spark session and returned results
• ‒ corr()
• recoverPartitions() ‒ count()
• Workers: computer nodes that perform • refreshTable() ‒ cov()
parallel computation • refreshByPath() ‒ crosstab()
• Executors: processes on worker nodes • registerFunction() ‒ describe()
that do the parallel computation • setCurrentDatabase() ‒ freqItems()
• Action: is either an instruction to return • uncacheTable() ‒ summary()
something to the driver or to output data to PySpark Data Sources API • Column / cell control
a file system or database ‒ drop() # drops columns
• Input Reader / Streaming Source ‒ fillna() #alias to na.fillreplace()
• Transformation: is anything that isn’t an (spark.read, spark.readStream)
action and are performed in a lazzy fashion ‒ select(), selectExpr()
‒ load() ‒ withColumn()
• Map: indicates operations that can run in a ‒ schema() ‒ withColumnRenamed()
row independent fashion ‒ table() ‒ colRegex()
• Reduce: indicates operations that have • Output Writer / Streaming Sink • Row control
intra-row dependencies (df.write, df.writeStream)
‒ bucketBy() ‒ asc()
• Shuffle: is the movement of data from ‒ insertInto() ‒ asc_nulls_first()
executors to run a Reduce operation ‒ mode() ‒ asc_nulls_last()
• RDD: Redundant Distributed Dataset is ‒ outputMode() # streaming ‒ desc()
the legacy in-memory data format ‒ partitionBy() ‒ desc_nulls_first()
• DataFrame: a flexible object oriented ‒ save() ‒ desc_nulls_last()
data structure that that has a row/column ‒ saveAsTable() ‒ distinct()
‒ sortBy() ‒ dropDuplicates()
schema ‒ start() # streaming ‒ dropna() #alias to na.drop
• Dataset: a DataFrame like data structure ‒ trigger() # streaming ‒ filter()
that doesn’t have a row/column schema • Common Input / Output ‒ limit()
‒ csv() • Sorting
Spark Libraries ‒ format() ‒ asc()
• ML: is the machine learning library with ‒ jdbc() ‒ asc_nulls_first()
tools for statistics, featurization, evaluation, ‒ json() ‒ asc_nulls_last()
‒ parquet()
classification, clustering, frequent item ‒ option(), options() ‒ desc()
mining, regression, and recommendation ‒ orc() ‒ desc_nulls_first()
• GraphFrames / GraphX: is the graph ‒ text() ‒ desc_nulls_last()
analytics library ‒ sort()/orderBy()
• Structured Streaming: is the library that Structured Streaming ‒ sortWithinPartitions()
handles real-time streaming via micro- • StreamingQuery • Sampling
batches and unbounded DataFrames ‒ awaitTermination() ‒ sample()
‒ exception() ‒ sampleBy()
Spark Data Types ‒ explain() ‒ randomSplit()
• Strings ‒ foreach() • NA (Null/Missing) Transformations
‒ StringType ‒ foreachBatch() ‒ na.drop()
• Dates / Times ‒ id ‒ na.fill()
‒ DateType ‒ isActive ‒ na.replace()
‒ TimestampType ‒ lastProgress • Caching / Checkpointing / Pipelining
• Numeric ‒ name ‒ checkpoint()
‒ DecimalType ‒ processAllAvailable() ‒ localCheckpoint()
‒ DoubleType ‒ recentProgress ‒ persist(), unpersist()
‒ FloatType ‒ runId ‒ withWatermark() # streaming
‒ ByteType ‒ status ‒ toDF()
‒ IntegerType ‒ stop() ‒ transform()
‒ LongType • StreamingQueryManager (spark.streams) • Joining
‒ ShortType ‒ active
• Complex Types ‒ awaitAnyTermination() ‒ broadcast()
‒ ArrayType ‒ get() ‒ join()
‒ MapType ‒ resetTerminated() ‒ crossJoin()
‒ StructType ‒ exceptAll()
‒ StructField PySpark DataFrame Actions ‒ hint()
• Other • Local (driver) Output ‒ intersect(),intersectAll()
‒ BooleanType ‒ collect() ‒ subtract()
‒ BinaryType ‒ show() ‒ union()
‒ NullType (None) ‒ toJSON() ‒ unionByName()
‒ toLocalIterator() • Python Pandas
PySpark Session (spark) ‒ toPandas() ‒ apply()
• spark.createDataFrame() ‒ take() ‒ pandas_udf()
• spark.range() ‒ tail( ‒ mapInPandas()
• spark.streams • Status Actions ‒ applyInPandas()
• spark.sql() ‒ columns() • SQL
• spark.table() ‒ explain() ‒ createGlobalTempView()
• spark.udf() ‒ isLocal() ‒ createOrReplaceGlobalTempView()
‒ isStreaming() ‒ createOrReplaceTempView()
• spark.version() ‒ printSchema()
• spark.stop() ‒ dtypes ‒ createTempView()
• Partition Control ‒ registerJavaFunction()
‒ repartition() ‒ registerJavaUDAF()
‒ repartitionByRange()
‒ coalesce()
➢ Migration Solutions ➢ Technical Consulting
www.wisewithdata.com
➢ Analytical Solutions ➢ Education
PySpark 3.0 Quick Reference Guide
PySpark DataFrame Functions • Date & Time • Collections (Arrays & Maps)
‒ add_months() ‒ array()
• Aggregations (df.groupBy()) ‒ current_date() ‒ array_contains()
‒ agg() ‒ current_timestamp() ‒ array_distinct()
‒ approx_count_distinct() ‒ date_add(), date_sub() ‒ array_except()
‒ count() ‒ date_format() ‒ array_intersect()
‒ countDistinct() ‒ date_trunc() ‒ array_join()
‒ mean() ‒ datediff() ‒ array_max(), array_min()
‒ min(), max() ‒ dayofweek() ‒ array_position()
‒ first(), last() ‒ dayofmonth() ‒ array_remove()
‒ grouping() ‒ dayofyear() ‒ array_repeat()
‒ grouping_id() ‒ from_unixtime() ‒ array_sort()
‒ kurtosis() ‒ from_utc_timestamp() ‒ array_union()
‒ skewness() ‒ hour() ‒ arrays_overlap()
‒ stddev() ‒ last_day(),next_day() ‒ arrays_zip()
‒ stddev_pop() ‒ minute() ‒ create_map()
‒ stddev_samp() ‒ month() ‒ element_at()
‒ sum() ‒ months_between() ‒ flatten()
‒ sumDistinct() ‒ quarter() ‒ map_concat()
‒ var_pop() ‒ second() ‒ map_entries()
‒ var_samp() ‒ to_date() ‒ map_from_arrays()
‒ variance() ‒ to_timestamp() ‒ map_from_entries()
• Column Operators ‒ to_utc_timestamp() ‒ map_keys()
‒ alias() ‒ trunc() ‒ map_values()
‒ between() ‒ unix_timestamp() ‒ sequence()
‒ contains() ‒ weekofyear() ‒ shuffle()
‒ eqNullSafe() ‒ window() ‒ size()
‒ isNull(), isNotNull() ‒ year() ‒ slice()
‒ isin() • String ‒ sort_array()
‒ isnan() ‒ concat() • Conversion
‒ like() ‒ concat_ws() ‒ base64(), unbase64()
‒ rlike() ‒ format_string() ‒ bin()
‒ getItem() ‒ initcap() ‒ cast()
‒ getField() ‒ instr() ‒ conv()
‒ startswith(), endswith() ‒ length() ‒ encode(), decode()
• Basic Math ‒ levenshtein() ‒ from_avro(), to_avro()
‒ abs() ‒ locate() ‒ from_csv(), to_csv()
‒ exp(),expm1() ‒ lower(), upper() ‒ from_json(), to_json()
‒ factorial() ‒ lpad(), rpad() ‒ get_json_object()
‒ floor(), ceil() ‒ ltrim(), rtrim() ‒ hex(), unhex()
‒ greatest(),least() ‒ overlay()
‒ pow() ‒ regexp_extract() PySpark Windowed Aggregates
‒ round(), bround() ‒ regexp_replace() • Window Operators
‒ rand() ‒ repeat() ‒ over()
‒ randn() ‒ reverse() • Window Specification
‒ sqrt(), cbrt() ‒ soundex() ‒ orderBy()
‒ log(), log2(), log10(), log1p() ‒ split() ‒ partitionBy()
‒ signum() ‒ substring() ‒ rangeBetween()
• Trigonometry ‒ substring_index() ‒ rowsBetween()
‒ cos(), cosh(), acos() ‒ translate() • Ranking Functions
‒ degrees() ‒ trim() ‒ ntile()
‒ hypot() • Hashes ‒ percentRank()
‒ radians() ‒ crc32() ‒ rank(), denseRank()
‒ sin(), sinh(), asin() ‒ hash() ‒ row_number()
‒ tan(), tanh(), atan(), atan2() ‒ md5() • Analytical Functions
• Multivariate Statistics ‒ sha1(), sha2() ‒ cume_dist()
‒ corr() ‒ xxhash64() ‒ lag(), lead()
‒ covar_pop() • Special • Aggregate Functions
‒ covar_samp() ‒ col() ‒ All of the listed aggregate functions
• Conditional Logic ‒ expr() • Window Specification Example
‒ coalesce() ‒ input_file_name() from pyspark.sql.window import Window
‒ nanvl() ‒ lit() windowSpec = \
‒ otherwise() ‒ monotonically_increasing_id() Window \
‒ when() ‒ spark_partition_id() .partitionBy(...) \
• Formatting .orderBy(...) \
‒ format_string() .rowsBetween(start, end) # ROW Window Spec
‒ format_number() # or
• Row Creation .rangeBetween(start, end) #RANGE Window Spec
‒ explode(), explode_outer()
‒ posexplode(), posexplode_outer() # example usage in a DataFrame transformation
• Schema Inference df.withColumn(‘rank’,rank(...).over(windowSpec)
‒ schema_of_csv()
‒ schema_of_json()
©WiseWithData 2020-Version 3.0-0622
➢ Migration Solutions ➢ Technical Consulting
www.wisewithdata.com
➢ Analytical Solutions ➢ Education