Azure Synapse Lab Guide
Azure Synapse Lab Guide
NOTE You may need to logout of whatever account you are already logged into before
you can gain access to the target workspace. You can also attempt to utilize a new
InPrivate browser window.
If this is your first time connecting to the Synapse Analytics workspace, you may
see the Getting started prompt. Select Close to continue.
Once the Synapse Studio workspace is loaded, navigate to the Manage hub.
From the center menu, select SQL pools from beneath the Analytics pools heading.
Locate SQLPool01, and select the Resume button.
The Manage menu item is selected, with SQL pools selected from the center menu. The
resume button is selected next to the SQLPool01 item.
After connecting to the Synapse Analytics workspace, navigate to the Manage hub.
Open Linked services and select + New to create a new linked service. Select Azure
Cosmos DB (SQL API) in the list of options, then select Continue.
Manage, New, and the Azure Cosmos DB linked service option are highlighted.
Name the linked service asacosmosdb01. Set the Account selection method to From
Azure subscription and select the Azure Labs X subscription. For Azure Cosmos DB
account name select asacosmosdb{Suffix} and set the Database name value to
CustomerProfile.
With the Workspace tab selected under Data, select + in the toolbar, then select
Integration dataset to create a new dataset.
Create a new Azure Cosmos DB (SQL API) dataset with the following characteristics:
After creating the dataset, navigate to its Connection tab, then select Preview
data.
Preview data queries the selected Azure Cosmos DB collection and returns a sample
of the documents within. The documents are stored in JSON format and include a
userId field, cartId, preferredProducts (an array of product IDs that may be
empty), and productReviews (an array of written product reviews that may be empty).
We will use this data in lab 2.
Select the Schema tab, then select Import schema. Synapse Analytics evaluates the
JSON documents within the collection and infers the schema based on the nature of
the data within. Since we are only storing one document type in this collection,
you will see the inferred schema for all documents within.
Remaining in the Data Hub, on the Data blade, expand the + menu, and select
Integration dataset. Create a new Azure Data Lake Storage Gen2 dataset with the
Parquet format type with the following characteristics (remember, you can create
integration datasets in the Data Hub):
Remaining in the Data Hub, on the Data blade, expand the + menu, and select
Integration dataset. Create a new Azure Data Lake Storage Gen2 integration dataset
with the JSON format type with the following characteristics (Data Hub + New
Integration Dataset):
Name: Enter asal400_ecommerce_userprofiles_source.
Linked service: Select the asadatalakeXX linked service that already exists.
File path: Browse to the wwi-02/online-user-profiles-02 path.
Import schema: Select From connection/store.
Select Publish all, then Publish to save your new resources.
In Azure Synapse Analytics, you have the possibility of using either the Synapse
serverless SQL engine, the big-data Spark engine, or both.
In this exercise, you will explore the data lake using both options.
Select the Linked tab and expand Azure Data Lake Storage Gen2*. Expand the
asaworkspaceXX primary ADLS Gen2 account and select wwi-02.
Ensure Built-in is selected in the Connect to dropdown list above the query window,
then run the query. Data is loaded by the Synapse SQL Serverless endpoint and
processed as if was coming from any regular relational database.
Modify the SQL query to perform aggregates and grouping operations to better
understand the data. Replace the query with the following, making sure that the
file path in OPENROWSET matches your current file path:
SELECT
TransactionDate, ProductId,
CAST(SUM(ProfitAmount) AS decimal(18,2)) AS [(sum) Profit],
CAST(AVG(ProfitAmount) AS decimal(18,2)) AS [(avg) Profit],
SUM(Quantity) AS [(sum) Quantity]
FROM
OPENROWSET(
BULK 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small/
Year=2016/Quarter=Q4/Month=12/Day=20161231/sale-small-20161231-snappy.parquet',
FORMAT='PARQUET'
) AS [r] GROUP BY r.TransactionDate, r.ProductId;
The T-SQL query above is displayed within the query window.
Now let's figure out how many records are contained within the Parquet files for
2019 data. This information is important for planning how we optimize for importing
the data into Azure Synapse Analytics. To do this, replace your query with the
following (be sure to update the name of your data lake in BULK statement, by
replacing [asadatalakeSUFFIX]):
SELECT
COUNT(*)
FROM
OPENROWSET(
BULK 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small/
Year=2019/*/*/*/*',
FORMAT='PARQUET'
) AS [r];
Notice how we updated the path to include all Parquet files in all subfolders of
sale-small/Year=2019.
Optional: If you wish to keep this SQL script for future reference, select the
Properties button, provide a descriptive name, such as ASAL400 - Lab1 - Explore
sales data, then select Publish all.
The SQL Script properties is displayed with the new script name, and the Publish
all button is highlighted.
The Parquet file is displayed with the New notebook menu item highlighted.
This will generate a notebook with PySpark code to load the data in a dataframe and
display 100 rows with the header.
Note: The first time you run a notebook in a Spark pool, Synapse creates a new
session. This can take approximately 3-5 minutes.
Note: To run just the cell, either hover over the cell and select the Run cell icon
to the left of the cell, or select the cell then type Ctrl+Enter on your keyboard.
Create a new cell underneath by hovering over the cell and selecting the + Code
button beneath the notebook cell.
The Spark engine can analyze the Parquet files and infer the schema. To do this,
enter the following in the new cell:
df.printSchema()
Your output should look like the following:
root
|-- TransactionId: string (nullable = true)
|-- CustomerId: integer (nullable = true)
|-- ProductId: short (nullable = true)
|-- Quantity: short (nullable = true)
|-- Price: decimal(29,2) (nullable = true)
|-- TotalAmount: decimal(29,2) (nullable = true)
|-- TransactionDate: integer (nullable = true)
|-- ProfitAmount: decimal(29,2) (nullable = true)
|-- Hour: byte (nullable = true)
|-- Minute: byte (nullable = true)
|-- StoreId: short (nullable = true)
Now let's use the dataframe to perform the same grouping and aggregate query we
performed with the serverless SQL pool. Create a new cell and enter the following:
profitByDateProduct = (df.groupBy("TransactionDate","ProductId")
.agg(
sum("ProfitAmount").alias("(sum)ProfitAmount"),
round(avg("Quantity"), 4).alias("(avg)Quantity"),
sum("Quantity").alias("(sum)Quantity"))
.orderBy("TransactionDate"))
profitByDateProduct.show(100)
We import required Python libraries to use aggregation functions and types defined
in the schema to successfully execute the query.
Task 3: Query user profile JSON data with Apache Spark in Azure Synapse Analytics
In addition to the sales data, we have customer profile data from an e-commerce
system that provides top product purchases for each visitor of the site (customer)
over the past 12 months. This data is stored within JSON files in the data lake. We
will import this data in the next lab, but let's explore it while we're in the
Spark notebook.
Create a new cell in the Spark notebook, enter the following code, replace
<asadatalakeNNNNNN> with your data lake name (you can find this value in the first
cell of the notebook), and execute the cell:
df = (spark.read \
.option("inferSchema", "true") \
.json("abfss://wwi-02@asadatalakeSUFFIX.dfs.core.windows.net/online-user-
profiles-02/*.json", multiLine=True)
)
df.printSchema()
Your output should look like the following:
root
|-- topProductPurchases: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemsPurchasedLast12Months: long (nullable = true)
| | |-- productId: long (nullable = true)
|-- visitorId: long (nullable = true)
Notice that we are selecting all JSON files within the online-user-profiles-02
directory. Each JSON file contains several rows, which is why we specified the
multiLine=True option. Also, we set the inferSchema option to true, which instructs
the Spark engine to review the files and create a schema based on the nature of the
data.
We have been using Python code in these cells up to this point. If we want to query
the files using SQL syntax, one option is to create a temporary view of the data
within the dataframe. Execute the following in a new cell to create a view named
user_profiles:
%%sql
This makes analyzing the data a bit difficult. This is because the JSON file
contents look like the following:
[
{
"visitorId": 9529082,
"topProductPurchases": [
{
"productId": 4679,
"itemsPurchasedLast12Months": 26
},
{
"productId": 1779,
"itemsPurchasedLast12Months": 32
},
{
"productId": 2125,
"itemsPurchasedLast12Months": 75
},
{
"productId": 2007,
"itemsPurchasedLast12Months": 39
},
{
"productId": 1240,
"itemsPurchasedLast12Months": 31
},
{
"productId": 446,
"itemsPurchasedLast12Months": 39
},
{
"productId": 3110,
"itemsPurchasedLast12Months": 40
},
{
"productId": 52,
"itemsPurchasedLast12Months": 2
},
{
"productId": 978,
"itemsPurchasedLast12Months": 81
},
{
"productId": 1219,
"itemsPurchasedLast12Months": 56
},
{
"productId": 2982,
"itemsPurchasedLast12Months": 59
}
]
},
{
...
},
{
...
}
]
PySpark contains a special explode function, which returns a new row for each
element of the array. This will help flatten the topProductPurchases column for
better readability or for easier querying. Execute the following in a new cell:
flat=df.select('visitorId',explode('topProductPurchases').alias('topProductPurchase
s_flat'))
flat.show(100)
In this cell, we created a new dataframe named flat that includes the visitorId
field and a new aliased field named topProductPurchases_flat. As you can see, the
output is a bit easier to read and, by extension, easier to query.
Create a new cell and execute the following code to create a new flattened version
of the dataframe that extracts the topProductPurchases_flat.productId and
topProductPurchases_flat.itemsPurchasedLast12Months fields to create new rows for
each data combination:
topPurchases =
(flat.select('visitorId','topProductPurchases_flat.productId','topProductPurchases_
flat.itemsPurchasedLast12Months')
.orderBy('visitorId'))
topPurchases.show(100)
In the output, notice that we now have multiple rows for each visitorId.
Let's order the rows by the number of items purchased in the last 12 months. Create
a new cell and execute the following code:
topPurchases.orderBy("itemsPurchasedLast12Months desc")
An error is displayed.
sortedTopPurchases = (topPurchases
.orderBy( col("itemsPurchasedLast12Months").desc() ))
sortedTopPurchases.show(100)
How many types of products did each customer purchase? To figure this out, we need
to group by visitorId and aggregate on the number of rows per customer. Execute the
following code in a new cell:
groupedTopPurchases = (sortedTopPurchases.select("visitorId")
.groupBy("visitorId")
.agg(count("*").alias("total"))
.orderBy("visitorId") )
groupedTopPurchases.show(100)
The query output is displayed.
How many total items did each customer purchase? To figure this out, we need to
group by visitorId and aggregate on the sum of itemsPurchasedLast12Months values
per customer. Execute the following code in a new cell:
groupedTopPurchases =
(sortedTopPurchases.select("visitorId","itemsPurchasedLast12Months")
.groupBy("visitorId")
.agg(sum("itemsPurchasedLast12Months").alias("totalItemsPurchased"))
.orderBy("visitorId") )
groupedTopPurchases.show(100)
The query output is displayed.
Exercise 3: Import sales data with PolyBase and COPY using T-SQL
There are different options for loading large amounts and varying types of data
into Azure Synapse Analytics, such as through T-SQL commands using a Synapse SQL
Pool, and with Azure Synapse pipelines. In our scenario, Wide World Importers
stores most of their raw data in a data lake and in different formats. Among the
data loading options available to them, WWI's data engineers are most comfortable
using T-SQL.
However, even with their familiarity with SQL, there are some things to consider
when loading large or disparate file types and formats. Since the files are stored
in ADLS Gen2, WWI can use either PolyBase external tables or the new COPY
statement. Both options enable fast and scalable data load operations, but there
are some differences between the two:
PolyBase COPY
GA, stable GA, stable
Needs CONTROL permission Relaxed permission
Has row width limits No row width limit
No delimiters within text Supports delimiters in text
Fixed line delimiter Supports custom column and row delimiters
Complex to set up in code Reduces amount of code
WWI has heard that PolyBase is generally faster than COPY, especially when working
with large data sets.
In this exercise, you will help WWI compare ease of setup, flexibility, and speed
between these loading strategies.
In this task, you will create a new staging table named SaleHeap in a new schema
named wwi_staging. You will define it as a heap and use round-robin distribution.
When WWI finalizes their data loading pipeline, they will load the data into
SaleHeap, then insert from the heap table into Sale. Although this is a two-step
process, the second step of inserting the rows to the production table does not
incur data movement across the distributions.
You will also create a new Sale clustered columnstore table within the wwi_staging
to compare data load speeds.
From the Develop menu, select the + button and choose SQL Script from the context
menu.
In the toolbar menu, connect to the Dedicated SQL Pool resource to execute the
query.
In the query window, replace the script with the following to create the
wwi_staging schema:
Note: If you receive the following error, continue to the next step: Failed to
execute query. Error: There is already an object named 'wwi_staging' in the
database. CREATE SCHEMA failed due to previous errors.
In the query window, replace the script with the following to create the heap
table:
In the query window, replace the script with the following to create the Sale table
in the wwi_staging schema for load comparisons:
An external data source that points to the abfss path in ADLS Gen2 where the
Parquet files are located
An external file format for Parquet files
An external table that defines the schema for the files, as well as the location,
data source, and file format
In the query window, replace the script with the following to create the external
data source. Be sure to replace SUFFIX with the lab workspace id:
In the query window, replace the script with the following to create the external
file format and external data table. Notice that we defined TransactionId as an
nvarchar(36) field instead of uniqueidentifier. This is because external tables do
not currently support uniqueidentifier columns:
Select Run from the toolbar menu to execute the SQL command.
In the query window, replace the script with the following to load the data into
the wwi_staging.SalesHeap table:
In the query window, replace the script with the following to see how many rows
were imported:
In the query window, replace the script with the following to truncate the heap
table and load data using the COPY statement. Be sure to replace SUFFIX with the id
from your workspace:
In the query window, replace the script with the following to see how many rows
were imported:
In the query window, replace the script with the following to load data into the
clustered columnstore Sale table using the COPY statement. Be sure to replace
SUFFIX with the id for your workspace:
In the query window, replace the script with the following to see how many rows
were imported:
What were the results? Did the load operation take more or less time writing to
Sale table vs. the heap (SaleHeap) table?
PolyBase vs. COPY (DW2000) (insert 2019 small data set (339,507,246 rows)):
WWI has a nightly process that ingests regional sales data from a partner analytics
system and saves the files in the data lake. The text files use non-standard column
and row delimiters where columns are delimited by a . and rows by a ,:
20200421.114892.130282.159488.172105.196533,20200420.109934.108377.122039.101946.10
0712,20200419.253714.357583.452690.553447.653921
The data has the following fields: Date, NorthAmerica, SouthAmerica, Europe,
Africa, and Asia. They must process this data and store it in Synapse Analytics.
In the query window, replace the script with the following to create the
DailySalesCounts table and load data using the COPY statement. Be sure to replace
<PrimaryStorage>` with the default storage account name for your workspace:
Select Run from the toolbar menu to execute the SQL command.
In the query window, replace the script with the following to view the imported
data:
Try viewing the results in a Chart and set the Category column to Date:
Task 6: Use PolyBase to load text file with non-standard row delimiters
Let's try this same operation using PolyBase.
In the query window, replace the script with the following to create a new external
file format, external table, and load data using PolyBase:
You should see an error similar to: Failed to execute query. Error:
HdfsBridge::recordReaderFillBuffer - Unexpected error encountered filling record
reader buffer: HadoopExecutionException: Too many columns in the line..
In this exercise, we will focus on the orchestration aspect. Lab 2 will focus more
on the transformation (data flow) pipelines. You will create a new pipeline to
import a large Parquet file, following best practices to improve the load
performance.
Be sure that you allocate enough memory to the pipeline session. To do this,
increase the resource class of a user which has permissions to rebuild the index on
this table to the recommended minimum.
To run loads with appropriate compute resources, create loading users designated
for running loads. Assign each loading user to a specific resource class or
workload group. To run a load, sign in as one of the loading users, and then run
the load. The load runs with the user's resource class.
In the query window, replace the script with the following to create a workload
group, BigDataLoad, that uses workload isolation by reserving a minimum of 50%
resources with a cap of 100%:
In the query window, replace the script with the following to create a new workload
classifier, HeavyLoader that assigns the asa.sql.import01 user we created in your
environment to the BigDataLoad workload group. At the end, we select from
sys.workload_management_workload_classifiers to view all classifiers, including the
one we just created:
Locate and select a linked service named sqlpool01_import01. Notice that the user
name for the SQL Pool connection is the asa.sql.import01 user we added to the
HeavyLoader classifier. We will use this linked service in our new pipeline to
reserve resources for the data load activity.
In the Properties pane for the new pipeline, enter the following Name: ASAL400 -
Copy December Sales.
Expand Move & transform within the Activities list, then drag the Copy data
activity onto the pipeline canvas.
Select the Copy data activity on the canvas and set the Name to Copy Sales.
Select the Source tab, then select + New next to Source dataset.
Select the Azure Data Lake Storage Gen2 data store, then select Continue.
Select the Sink tab, then select + New next to Sink dataset.
Select the Azure Synapse Analytics data store, then select Continue.
In the Sink tab, select the Copy command copy method and enter the following in the
pre-copy script to clear the table before import: TRUNCATE TABLE
wwi_staging.SaleHeap.
Select the Mapping tab and select Import schemas to create mappings for each source
and destination field.
Select Settings and set the Data integration unit to 8. This is required due to the
large size of the source Parquet file.
Select Add trigger, then Trigger now. Select OK in the pipeline run trigger to
begin.
Trigger now.
Select Pipeline Runs. You can see the status of your pipeline run here. Note that
you may need to refresh the view. Once the pipeline run is complete, you can query
the wwi_staging.SaleHeap table to view the imported data.
For example, if the optimizer estimates that the date your query is filtering on
will return one row it will choose one plan. If it estimates that the selected date
will return 1 million rows, it will return a different plan.
SELECT
*
FROM
sys.dm_pdw_exec_requests
WHERE
Command like '%CREATE STATISTICS%'
Notice the special name pattern used for automatically created statistics:
To determine if there are any statistics created for a specific column, you can
leverage DBCC SHOW_STATISTICS. In our scenario, let's determine the statistics set
on the CustomerId column from the wwi_perf.Sale_Hash table.
Azure Synapse Analytics dedicated SQL pools have 60 storage distributions and when
selecting a distribution key for your hash, the goal is to select the optimal
column for distributing data evenly.
Use DBCC PDW_SHOWSPACEUSED() to find data skew. In a new SQL script tab, copy,
paste and execute the following query:
By leveraging Azure Synapse Analytics Dynamic Management Views (DMVs), you can
perform a more detailed skew analysis. Copy, paste, and execute the following
script to create a view to identify which tables have data skew.
SELECT
[two_part_name]
,[distribution_id]
,[row_count]
,[reserved_space_GB]
,[unused_space_GB]
,[data_space_GB]
,[index_space_GB]
FROM [dbo].[vDistributionSkew]
WHERE [table_name] = 'Sale'
AND [schema_name] = 'wwi_staging'
ORDER BY [row_count] DESC
To decide if you should resolve data skew in a table, you should understand as much
as possible about the data volumes and queries in your workload.
Distributing data is a matter of finding the right balance between minimizing data
skew and minimizing data movement. These can be opposing goals, and sometimes you
will want to keep data skew in order to reduce data movement. For example, when the
distribution column is frequently the shared column in joins and aggregations, you
will be minimizing data movement. The benefit of having the minimal data movement
might outweigh the impact of having data skew.
From the center menu, select SQL pools from beneath the Analytics pools heading.
Locate SQLPool01, and select the Pause button.
The Manage menu item is selected, with SQL pools selected from the center menu. The
resume button is selected next to the SQLPool01 item.