SQL & PYSPARK EQUIVALENT
Concept SQL PySpark
SELECT column(s) FROM table df.select("column(s)")
SELECT
SELECT * FROM table df.select("*")
DISTINCT SELECT DISTINCT column(s) FROM table df.select("column(s)").distinct()
SELECT column(s) FROM table WHERE df.filter(condition)\
WHERE
condition .select("column(s)")
SELECT column(s) FROM table ORDER BY df.sort("column(s)")\
ORDER BY
column(s) .select("column(s)")
LIMIT SELECT column(s) FROM table LIMIT n df.limit(n).select("column(s)")
SELECT COUNT(*) FROM table
COUNT df.count()
Concept SQL PySpark
from pyspark.sql.functions import sum;
SUM SELECT SUM(column) FROM table
df.agg(sum("column"))
from pyspark.sql.functions import avg;
AVG SELECT AVG(column) FROM table
df.agg(avg("column"))
SELECT MAX(column) FROM
table from pyspark.sql.functions import max;
MAX / MIN df.agg(max("column"))
String from pyspark.sql.functions import length;
SELECT LEN(string) FROM table
Length df.select(length(col("string")))
Convert to SELECT UPPER(string) from pyspark.sql.functions import upper;
Uppercase FROM table df.select(upper(col("string")))
Convert to SELECT LOWER(string) from pyspark.sql.functions import lower;
Lowercase FROM table df.select(lower(col("string")))
https://www.linkedin.com/in/girish-gowda-8a58601b9/
Concept SQL PySpark
Concatenate SELECT CONCAT(string1, from pyspark.sql.functions import concat;
Strings string2) FROM table df.select(concat(col("string1"),
col("string2")))
SELECT TRIM(string) from pyspark.sql.functions import trim;
Trim String
FROM table df.select(trim(col("string")))
SELECT SUBSTRING(string, from pyspark.sql.functions import substring;
Substring
start, length) FROM table df.select(substring(col("string"),start, length))
CURDATE,
from pyspark.sql.functions import current_date;
NOW, SELECT CURDATE() FROM table
df.select(current_date())
CURTIME
CAST, SELECT CAST(column AS
df.select(col("column").cast("datatype"))
CONVERT datatype) FROM table
from pyspark.sql.functions import when,
SELECT IF(condition, value1, otherwise;
IF
value2) FROM table df.select(when(condition,value1)\
.otherwise(value2))
Concept SQL PySpark
SELECT COALESCE(column1, from pyspark.sql.functions import coalesce;
COALESCE column2, column3) FROM df.select(coalesce("column1","column2",
table "column3"))
JOIN table1 ON table1.column
JOIN df1.join(df2, "column")
= table2.column
GROUP BY GROUP BY column(s) df.groupBy("column(s)")
PIVOT (agg_function(column) df.groupBy("pivot_column")\
PIVOT
FOR pivot_column IN (values)) .pivot("column").agg(agg_function)
SELECT column FROM table
Logical df.filter((col("column1") == value)
WHERE column1 = value
Operators & (col("column2") > value))
AND column2 > value
IS NULL, IS SELECT column FROM table df.filter(col("column").isNull())\
NOT NULL WHERE column IS NULL .select("column")
Concept SQL PySpark
SELECT column FROM table
LIKE df.filter(col("column").like("value%"))
WHERE column LIKE 'value%'
SELECT column FROM table df.filter((col("column") >= value1)
BETWEEN WHERE column & (col("column") <= value2))\
BETWEEN value1 AND value2 .select("column")
SELECT column FROM table1
UNION, df1.union(df2).select("column") or
UNION SELECT column FROM
UNION ALL df1.unionAll(df2).select("column")
table2
from pyspark.sql import Window; from
RANK, SELECT column, RANK() OVER pyspark.sql.functions import rank;
DENSERANK, (ORDER BY column) as df.select("column",
ROWNUMBER rank FROM table rank().over(Window.orderBy("column"))\
.alias("rank"))
WITH cte1 AS (SELECT * FROM df.createOrReplaceTempView("cte1");
table1), df_cte1 = spark.sql("SELECT * FROM cte1
CTE SELECT * FROM cte1 WHERE WHERE condition"); df_cte1.show() or
condition df.filter(condition1).filter(condition2)
https://www.linkedin.com/in/girish-gowda-8a58601b9/
DDL operations
Concept SQL PySpark
INT: for integer values In PySpark, the data types are similar, but are
BIGINT: for large integer values represented differently.
FLOAT: for floating point values
DOUBLE: for double precision floating
point values IntegerType: for integer values
CHAR: for fixed-length character LongType: for long integer values
strings FloatType: for floating point values
Datatypes VARCHAR: for variable-length DoubleType: for double precision floating
character strings DATE: point values
for date values StringType: for character strings
TIMESTAMP: for timestamp values TimestampType: for timestamp values
DateType: for date values
CREATE TABLE table_name
Create df.write.format("parquet")\
(column_name data_type
Table .saveAsTable("table_name")
constraint);
https://www.linkedin.com/in/girish-gowda-8a58601b9/
Concept SQL PySpark
from pyspark.sql.types import StructType,
StructField, IntegerType, StringType, DecimalType
CREATE TABLE table_name(
Create schema = StructType([
column_name data_type
StructField("id", IntegerType(), True),
Table with [constraints],
StructField("name", StringType(), False),
Columns column_name data_type StructField("age", IntegerType(), True),
definition [constraints], StructField("salary", DecimalType(10,2), True)])
...);
df = spark.createDataFrame([], schema)
CREATE TABLE table_name(
column_name data_type In PySpark or HiveQL, primary key constraints are not
PRIMARY KEY, enforced directly. However, you can use the
Create ...); dropDuplicates() method to remove duplicate rows
Table with based on one or more columns.
Primary Key If table already exists:
ALTER TABLE table_name
ADD PRIMARY KEY df = df.dropDuplicates(["id"])
(column_name);
not natively supported by the DataFrame API, but
there are several ways to achieve the same
Create CREATE TABLE table_name( id functionality.
Table with INT AUTO_INCREMENT, name
Auto VARCHAR(255), from pyspark.sql.functions import
Increment PRIMARY KEY (id)); monotonically_increasing_id df =
constraint df.withColumn("id",
monotonically_increasing_id()+start_value)
https://www.linkedin.com/in/girish-gowda-8a58601b9
Concept SQL PySpark
Adding a from pyspark.sql.functions import lit
ALTER TABLE table_name
column df=df.withColumn("column_name",
ADD column_name datatype;
lit(None).cast("datatype"))
Modifying a
ALTER TABLE table_name df=df.withColumn("column_name",
column
MODIFY column_name datatype; df["column_name"].cast("datatype"))
Dropping a
ALTER TABLE table_name
column df = df.drop("column_name")
DROP COLUMN column_name;
ALTER TABLE table_name RENAME
COLUMN old_column_name TO
new_column_name;
Rename a
column In mysql,
ALTER TABLE employees CHANGE
COLUMN first_name
first_name_new VARCHAR(255);
df =df.withColumnRenamed("existing_column",
"new_column")
https://www.linkedin.com/in/girish-gowda-8a58601b9/
Girish Gowda | LinkedIn