Databricks Delta Guide
Note
Databricks Delta is in Preview.
Use this guide to learn about Databricks Delta, a powerful transactional storage
layer that harnesses the power of Apache Spark and Databricks DBFS.
Introduction to Databricks Delta
o Requirements
o Frequently asked questions (FAQ)
Databricks Delta Quickstart
o Create a table
o Read a table
o Append data to a table
o Stream data into a table
o Optimize a table
o Clean up snapshots
Table Batch Reads and Writes
o Create a table
o Read a table
o Write to a table
o Schema validation
o Update table schema
o Replace table schema
o Views on tables
o Table properties
o Table metadata
Table Streaming Reads and Writes
o As a source
o As a sink
Optimizing Performance and Cost
o Compaction (bin-packing)
o ZOrdering (multi-dimensional clustering)
o Data skipping
o Garbage collection
o Improving performance for interactive queries
o Frequently asked questions (FAQ)
Table Versioning
Concurrency Control and Isolation Levels in Databricks Delta
o Optimistic Concurrency Control
o Isolation levels
Porting Existing Workloads to Databricks Delta
o Example
PREVIOUS NEXT
Introduction to Databricks
Delta
Note
Databricks Delta is in Preview.
Databricks Delta delivers a powerful transactional storage layer by harnessing
the power of Apache Spark and Databricks DBFS. The core abstraction of
Databricks Delta is an optimized Spark table that
Stores data as Parquet files in DBFS.
Maintains a transaction log that efficiently tracks changes to the table.
You read and write data stored in the delta format using the same familiar
Apache Spark SQL batch and streaming APIs that you use to work with Hive
tables and DBFS directories. With the addition of the transaction log and other
enhancements, Databricks Delta offers significant benefits:
ACID transactions
Multiple writers can simultaneously modify a dataset and see consistent
views. For qualifications, see Multi-cluster writes.
Writers can modify a dataset without interfering with jobs reading the
dataset.
Fast read access
Automatic file management organizes data into large files that can be
read efficiently.
Statistics enable speeding up reads by 10-100x and and data skipping
avoids reading irrelevant information.
Requirements
Databricks Delta requires Databricks Runtime 4.1 or above. If you created a
Databricks Delta table using a Databricks Runtime lower than 4.1, the table
version must be upgraded. For details, see Table Versioning.
Frequently asked questions (FAQ)
How do Databricks Delta tables compare to Hive SerDe tables?
Databricks Delta tables are managed to a greater degree. In particular,
there are several Hive SerDe parameters that Databricks Delta manages
on your behalf that you should never specify manually:
ROWFORMAT
SERDE
OUTPUTFORMAT AND INPUTFORMAT
COMPRESSION
STORED AS
Does Databricks Delta support multi-table transactions?
Databricks Delta does not support multi-table transactions and foreign
keys. Databricks Delta supports transactions at the tablelevel.
Does Databricks Delta support writes or reads using the Spark
Streaming DStream API?
Databricks Delta does not support the DStream API. We recommend
Structured Streaming.
What DDL and DML features does Databricks Delta not support?
Unsupported DDL features:
o ANALYZE TABLE PARTITION
o ALTER TABLE [ADD|DROP] PARTITION
o ALTER TABLE SET LOCATION
o ALTER TABLE RECOVER PARTITIONS
o ALTER TABLE SET SERDEPROPERTIES
o CREATE TABLE LIKE
o INSERT OVERWRITE DIRECTORY
o LOAD DATA
Unsupported DML features:
o INSERT INTO [OVERWRITE] with static partitions.
o Bucketing.
o Specifying a schema when reading from a table. A command such
as spark.read.format("delta").schema(df.schema).load(path
) will fail.
o Specifying target partitions
using PARTITION (part_spec) in TRUNCATE TABLE.
What does it mean that Databricks Delta supports multi-cluster writes?
It means that Databricks Delta does locking to make sure that queries
writing to a table from multiple clusters at the same time won’t corrupt the
table. However, it does not mean that if there is a write conflict (for
example, update and delete the same thing) that they will both succeed.
Instead, one of writes will fail atomically and the error will tell you to retry
the operation.
What are the limitations of multi-cluster writes?
Databricks Delta supports transactional writes from multiple clusters in the
same workspace in Databricks Runtime 4.2 and above. All writers must be
running Databricks Runtime 4.2 or above. The following features are not
supported when running in this mode:
SparkR
Spark-submit job
Run a command using REST APIs
Client-side S3 encryption
Server-Side Encryption with Customer-Provided Encryption Keys
S3 paths with credentials in a cluster that cannot access AWS Security
Token Service
You can disable multi-cluster writes by
setting spark.databricks.delta.multiClusterWrites.enabled to fals
e. If they are disabled, writes to a single table must originate from a single
cluster.
Warning
You cannot concurrently modify the same Databricks Delta table
from different workspaces.
Writes to a single table using Databricks Runtime versions lower than
4.2 must originate from a single cluster. To perform transactional writes
from multiple clusters in the same workspace you must upgrade to
Databricks Runtime 4.2.
Why is Databricks Delta data I deleted still stored in S3?
If you are using Databricks Delta and have enabled bucket versioning you
have two entities managing table files: Databricks Delta and AWS. To
ensure that data is fully deleted you must:
Clean up deleted files that are no longer in the Databricks Delta
transaction log using VACUUM
Enable an S3 lifecycle policy for versioned objects that ensures that old
versions of deleted files are purged
Can I access Databricks Delta tables outside of Databricks Runtime?
There are two cases to consider: external writes and external reads.
External writes: Databricks Delta maintains additional metadata in the
form of a transaction log to enable ACID transactions and snapshot
isolation for readers. In order to ensure the transaction log is updated
correctly and the proper validations are performed, writes must go
through Databricks Runtime.
External reads: Databricks Delta tables store data encoded in an open
format (Parquet), allowing other tools that understand this format to
read the data. However, since other tools do not support Databricks
Delta‘s transaction log, it is likely that they will incorrectly read stale
deleted data, uncommitted data, or the partial results of failed
transactions.
In cases where the data is static (that is, there are no active jobs writing
to the table), you can use VACUUM with a retention of ZERO HOURS to
clean up any stale Parquet files that are not currently part of the table.
This operation puts the Parquet files present in DBFS into a consistent
state such that they can now be read by external tools.
However, Databricks Delta relies on stale snapshots for the following
functionality, which will break when using VACUUM with zero retention
allowance:
o Snapshot isolation for readers - Long running jobs will continue to
read a consistent snapshot from the moment the jobs started, even if
the table is modified concurrently. Running VACUUM with a
retention less than length of these jobs can cause them to fail with
a FileNotFoundException.
o Streaming from Databricks Delta tables - Streams read from the
original files written into a table in order to ensure exactly once
processing. When combined with OPTIMIZE, VACUUM with zero
retention can remove these files before the stream has time to
processes them, causing it to fail.
For these reasons we only recommend the above technique on static data
sets that must be read by external tools.
Databricks Delta Quickstart
This quickstart demonstrates the basics of working with Databricks Delta. This
topic shows how to build a pipeline that reads JSON data into a Databricks Delta
table and then append additional data. The topic includes an example notebook
that demonstrates basic Databricks Delta operations.
In this topic:
Create a table
Read a table
Append data to a table
o Example notebooks
Stream data into a table
Optimize a table
Clean up snapshots
Create a table
Create a table from a dataset. You can use existing Spark SQL code and change
the format from parquet, csv, json, and so on, to delta.
Scala
Copy
events = spark.read.json("/data/events")
events.write.format("delta").save("/data/events")
SQL
Copy
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`
These operations create a new table using the schema that was inferred from the
JSON data. For the full set of options available when you create a new
Databricks Delta table, see Create a table and Write to a table.
Read a table
You access data in Databricks Delta tables either by specifying the path on
DBFS ("/data/events") or the table name ("events"):
Scala
Copy
events = spark.read.format("delta").load("/data/events")
or
Copy
events = spark.table("events")
SQL
Copy
SELECT * FROM delta.`/data/events`
or
Copy
SELECT * FROM events
Append data to a table
As new events arrive, you can atomically append them to the table:
Scala
Copy
newEvents.write
.format("delta")
.mode("append")
.save("/data/events")
or
Copy
newEvents.write
.format("delta")
.mode("append")
.saveAsTable("events")
SQL
Copy
INSERT INTO events VALUES(...)
or
Copy
INSERT INTO events SELECT * FROM newEvents
For an example of how to create a Databricks Delta table and append to it, see
the following notebook:
Example notebooks
Python notebook
Scala notebook
SQL notebook
Python notebook
How to import a notebookGet notebook link
Scala notebook
How to import a notebookGet notebook link
SQL notebook
How to import a notebookGet notebook link
Stream data into a table
You can also use Structured Streaming to stream new data as it arrives into the
table:
Copy
events = spark.readStream.json("/data/events")
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json")
.start("/delta/events")
For more information about Databricks Delta integration with Structured
Streaming, see Table Streaming Reads and Writes.
Optimize a table
Once you have been streaming for awhile, you will likely have a lot of small files
in the table. If you want to improve the speed of read queries, you can
use OPTIMIZE to collapse small files into larger ones:
Copy
OPTIMIZE delta.`/data/events`
or
Copy
OPTIMIZE events
You can also specify interesting columns that are often present in query
predicates for your workload, and Databricks Delta uses this information to
cluster related records together:
Copy
OPTIMIZE events ZORDER BY eventType, city
For the full set of options available when running OPTIMIZE, see Optimizing
Performance and Cost.
Clean up snapshots
Databricks Delta provides snapshot isolation for reads, which means that it is
safe to run OPTIMIZE even while other users or jobs are querying the table.
Eventually you should clean up old snapshots. You can do this by running
the VACUUM command:
Copy
VACUUM events
You control the age of the latest retained snapshot by using
the RETAIN <N> HOURS option:
Copy
VACUUM events RETAIN 24 HOURS
For details on using VACUUM effectively, see Garbage collection.
PREVIOUS NEXT