Google BigQuery I/O connector
- Java SDK
- Python SDK
The Beam SDKs include built-in transforms that can read data from and write data to Google BigQuery tables.
Before you start
To use BigQueryIO, add the Maven artifact dependency to your pom.xml
file.
Additional resources:
To use BigQueryIO, you must install the Google Cloud Platform dependencies by
running pip install apache-beam[gcp]
.
Additional resources:
BigQuery basics
Table names
To read or write from a BigQuery table, you must provide a fully-qualified
BigQuery table name (for example, bigquery-public-data:github_repos.sample_contents
).
A fully-qualified BigQuery table name consists of three parts:
- Project ID: The ID for your Google Cloud Project. The default value comes from your pipeline options object.
- Dataset ID: The BigQuery dataset ID, which is unique within a given Cloud Project.
- Table ID: A BigQuery table ID, which is unique within a given dataset.
A table name can also include a table decorator if you are using time-partitioned tables.
To specify a BigQuery table, you can use either the table’s fully-qualified name as a string, or use a TableReference TableReference object.
Using a string
To specify a table with a string, use the format
[project_id]:[dataset_id].[table_id]
or [project_id].[dataset_id].[table_id]
to specify the fully-qualified BigQuery table name.
You can also omit project_id
and use the [dataset_id].[table_id]
format. If
you omit the project ID, Beam uses the default project ID from your
pipeline options.
pipeline options.
Using a TableReference
To specify a table with a TableReference
, create a new TableReference
using
the three parts of the BigQuery table name.
The Beam SDK for Java also provides the parseTableSpec
helper method, which constructs a TableReference
object from a String that
contains the fully-qualified BigQuery table name. However, the static factory
methods for BigQueryIO transforms accept the table name as a String and
construct a TableReference
object for you.
Table rows
BigQueryIO read and write transforms produce and consume data as a PCollection
of dictionaries, where each element in the PCollection
represents a single row
in the table.
Schemas
When writing to BigQuery, you must supply a table schema for the destination
table that you want to write to, unless you specify a create
disposition of CREATE_NEVER
. Creating a table
schema covers schemas in more detail.
Data types
BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. For an overview of Google Standard SQL data types, see Data types. BigQueryIO allows you to use all of these data types. The following example shows the correct format for data types used when reading from and writing to BigQuery:
import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class BigQueryTableRowCreate {
public static TableRow createTableRow() {
TableRow row =
new TableRow()
// To learn more about BigQuery data types:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
.set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
.set("int64_field", 432)
.set("float64_field", 3.141592653589793)
.set("numeric_field", new BigDecimal("1234.56").toString())
.set("bool_field", true)
.set(
"bytes_field",
Base64.getEncoder()
.encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))
// To learn more about date formatting:
// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
.set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
.set(
"datetime_field",
LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
.set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
.set(
"timestamp_field",
Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT
// To learn more about the geography Well-Known Text (WKT) format:
// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
.set("geography_field", "POINT(30 10)")
// An array has its mode set to REPEATED.
.set("array_field", Arrays.asList(1, 2, 3, 4))
// Any class can be written as a STRUCT as long as all the fields in the
// schema are present and they are encoded correctly as BigQuery types.
.set(
"struct_field",
Stream.of(
new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
new SimpleEntry<>("int64_value", "42"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
return row;
}
}
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.
As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded bytes.
Reading from BigQuery
BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query and read the results. By default, Beam invokes a BigQuery export request when you apply a BigQueryIO read transform. However, the Beam SDK for Java also supports using the BigQuery Storage Read API to read directly from BigQuery storage. See Using the Storage Read API for more information.
Beam’s use of BigQuery APIs is subject to BigQuery’s Quota and Pricing policies.
The Beam SDK for Java has two BigQueryIO read methods. Both of these methods allow you to read from a table, or read fields using a query string.
read(SerializableFunction)
reads Avro-formatted records and uses a specified parsing function to parse them into aPCollection
of custom typed objects. Each element in thePCollection
represents a single row in the table. The example code for reading with a query string shows how to useread(SerializableFunction)
.readTableRows
returns aPCollection
of BigQueryTableRow
objects. Each element in thePCollection
represents a single row in the table. Integer values in theTableRow
objects are encoded as strings to match BigQuery’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared toread(SerializableFunction)
. The example code for reading from a table shows how to usereadTableRows
.
Note: BigQueryIO.read()
is deprecated as of Beam SDK 2.2.0. Instead, use
read(SerializableFunction<SchemaAndRecord, T>)
to parse BigQuery rows from
Avro GenericRecord
into your custom type, or use readTableRows()
to parse
them into JSON TableRow
objects.
To read from a BigQuery table using the Beam SDK for Python, apply a ReadFromBigQuery
transform. ReadFromBigQuery
returns a PCollection
of dictionaries,
where each element in the PCollection
represents a single row in the table.
Integer values in the TableRow
objects are encoded as strings to match
BigQuery’s exported JSON format.
Note: BigQuerySource()
is deprecated as of Beam SDK 2.25.0. Before 2.25.0, to read from
a BigQuery table using the Beam SDK, apply a Read
transform on a BigQuerySource
. For example,
beam.io.Read(beam.io.BigQuerySource(table_spec))
.
Reading from a table
To read an entire BigQuery table, use the from
method with a BigQuery table
name. This example uses readTableRows
.
To read an entire BigQuery table, use the table
parameter with the BigQuery
table name.
The following code reads an entire table that contains weather station data and
then extracts the max_temperature
column.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTable {
public static PCollection<MyData> readFromTable(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
Reading with a query string
If you don’t want to read an entire table, you can supply a query string with
the fromQuery
method.
If you don’t want to read an entire table, you can supply a query string to
ReadFromBigQuery
by specifying the query
parameter.
The following code uses a SQL query to only read the max_temperature
column.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQuery {
public static PCollection<MyData> readFromQuery(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows()
.fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
.usingStandardSql())
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
You can also use BigQuery’s standard SQL dialect with a query string, as shown in the following example:
max_temperatures = (
pipeline
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
Query execution project
By default the pipeline executes the query in the Google Cloud project associated with the pipeline (in case of the Dataflow runner it’s the project where the pipeline runs). There are cases where the query execution project should be different from the pipeline project. If you use Java SDK, you can define the query execution project by setting the pipeline option “bigQueryProject” to the desired Google Cloud project id.
Using the Storage Read API
The BigQuery Storage API allows you to directly access tables in BigQuery storage, and supports features such as column selection and predicate filter push-down which can allow more efficient pipeline execution.
The Beam SDK for Java supports using the BigQuery Storage API when reading from BigQuery. SDK versions before 2.25.0 support the BigQuery Storage API as an experimental feature and use the pre-GA BigQuery Storage API surface. Callers should migrate pipelines which use the BigQuery Storage API to use SDK version 2.25.0 or later.
The Beam SDK for Python supports the BigQuery Storage API. Enable it
by passing method=DIRECT_READ
as a parameter to ReadFromBigQuery
.
Updating your code
Use the following methods when you read from a table:
- Required: Specify withMethod(Method.DIRECT_READ) to use the BigQuery Storage API for the read operation.
- Optional: To use features such as column projection and column filtering, you must specify withSelectedFields and withRowRestriction respectively.
The following code snippet reads from a table. This example is from the BigQueryTornadoes
example.
When the example’s read method option is set to DIRECT_READ
, the pipeline uses
the BigQuery Storage API and column projection to read public samples of weather
data from a BigQuery table. You can view the full source code on
GitHub.
import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTableWithBigQueryStorageAPI {
public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.from(String.format("%s:%s.%s", project, dataset, table))
.withMethod(Method.DIRECT_READ)
.withSelectedFields(
Arrays.asList(
"string_field",
"int64_field",
"float64_field",
"numeric_field",
"bool_field",
"bytes_field",
"date_field",
"datetime_field",
"time_field",
"timestamp_field",
"geography_field",
"array_field",
"struct_field")))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
The following code snippet reads with a query string.
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQueryWithBigQueryStorageAPI {
public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
String project, String dataset, String table, String query, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
/*
String query = String.format("SELECT\n" +
" string_field,\n" +
" int64_field,\n" +
" float64_field,\n" +
" numeric_field,\n" +
" bool_field,\n" +
" bytes_field,\n" +
" date_field,\n" +
" datetime_field,\n" +
" time_field,\n" +
" timestamp_field,\n" +
" geography_field,\n" +
" array_field,\n" +
" struct_field\n" +
"FROM\n" +
" `%s:%s.%s`", project, dataset, table)
*/
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
Writing to BigQuery
BigQueryIO lets you write to BigQuery tables. If you are using the Beam SDK for Java, you can write different rows to different tables. The Beam SDK for Java also supports using the BigQuery Storage Write API to write directly to BigQuery storage. For more information, see Using the Storage Write API.
BigQueryIO write transforms use APIs that are subject to BigQuery’s Quota and Pricing policies.
When you apply a write transform, you must provide the following information for the destination table(s):
- The table name.
- The destination table’s create disposition. The create disposition specifies whether the destination table must exist or can be created by the write operation.
- The destination table’s write disposition. The write disposition specifies whether the data you write replaces an existing table, appends rows to an existing table, or writes only to an empty table.
In addition, if your write operation creates a new BigQuery table, you must also supply a table schema for the destination table.
Create disposition
The create disposition controls whether or not your BigQuery write operation should create a table if the destination table does not exist.
Use .withCreateDisposition
to specify the create disposition. Valid enum
values are:
Write.CreateDisposition.CREATE_IF_NEEDED
: Specifies that the write operation should create a new table if one does not exist. If you use this value, you must provide a table schema with thewithSchema
method.CREATE_IF_NEEDED
is the default behavior.Write.CreateDisposition.CREATE_NEVER
: Specifies that a table should never be created. If the destination table does not exist, the write operation fails.
Use the create_disposition
parameter to specify the create disposition. Valid
enum values are:
BigQueryDisposition.CREATE_IF_NEEDED
: Specifies that the write operation should create a new table if one does not exist. If you use this value, you must provide a table schema.CREATE_IF_NEEDED
is the default behavior.BigQueryDisposition.CREATE_NEVER
: Specifies that a table should never be created. If the destination table does not exist, the write operation fails.
If you specify CREATE_IF_NEEDED
as the create disposition and you don’t supply
a table schema, the transform might fail at runtime if the destination table does
not exist.
Write disposition
The write disposition controls how your BigQuery write operation applies to an existing table.
Use .withWriteDisposition
to specify the write disposition. Valid enum values
are:
Write.WriteDisposition.WRITE_EMPTY
: Specifies that the write operation should fail at runtime if the destination table is not empty.WRITE_EMPTY
is the default behavior.Write.WriteDisposition.WRITE_TRUNCATE
: Specifies that the write operation should replace an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.Write.WriteDisposition.WRITE_APPEND
: Specifies that the write operation should append the rows to the end of the existing table.
Use the write_disposition
parameter to specify the write disposition. Valid
enum values are:
BigQueryDisposition.WRITE_EMPTY
: Specifies that the write operation should fail at runtime if the destination table is not empty.WRITE_EMPTY
is the default behavior.BigQueryDisposition.WRITE_TRUNCATE
: Specifies that the write operation should replace an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.BigQueryDisposition.WRITE_APPEND
: Specifies that the write operation should append the rows to the end of the existing table.
When you use WRITE_EMPTY
, the check for whether or not the destination table
is empty can occur before the actual write operation. This check doesn’t
guarantee that your pipeline will have exclusive access to the table. Two
concurrent pipelines that write to the same output table with a write
disposition of WRITE_EMPTY
might start successfully, but both pipelines can
fail later when the write attempts happen.
Creating a table schema
If your BigQuery write operation creates a new table, you must provide schema information. The schema contains information about each field in the table. When updating a pipeline with a new schema, the existing schema fields must stay in the same order, or the pipeline will break, failing to write to BigQuery.
To create a table schema in Java, you can either use a TableSchema
object, or
use a string that contains a JSON-serialized TableSchema
object.
To create a table schema in Python, you can either use a TableSchema
object,
or use a string that defines a list of fields. Single string based schemas do
not support nested fields, repeated fields, or specifying a BigQuery mode for
fields (the mode is always set to NULLABLE
).
Using a TableSchema
To create and use a table schema as a TableSchema
object, follow these steps.
Create a list of
TableFieldSchema
objects. EachTableFieldSchema
object represents a field in the table.Create a
TableSchema
object and use thesetFields
method to specify your list of fields.Use the
withSchema
method to provide your table schema when you apply a write transform.
Create a
TableSchema
object.Create and append a
TableFieldSchema
object for each field in your table.Use the
schema
parameter to provide your table schema when you apply a write transform. Set the parameter’s value to theTableSchema
object.
The following example code shows how to create a TableSchema
for a table with
two fields (source and quote) of type string.
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
class BigQuerySchemaCreate {
public static TableSchema createSchema() {
// To learn more about BigQuery schemas:
// https://cloud.google.com/bigquery/docs/schemas
TableSchema schema =
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema()
.setName("string_field")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("int64_field")
.setType("INT64")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("float64_field")
.setType("FLOAT64"), // default mode is "NULLABLE"
new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
new TableFieldSchema().setName("bool_field").setType("BOOL"),
new TableFieldSchema().setName("bytes_field").setType("BYTES"),
new TableFieldSchema().setName("date_field").setType("DATE"),
new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
new TableFieldSchema().setName("time_field").setType("TIME"),
new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
new TableFieldSchema()
.setName("array_field")
.setType("INT64")
.setMode("REPEATED")
.setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
new TableFieldSchema()
.setName("struct_field")
.setType("STRUCT")
.setDescription(
"A STRUCT accepts a custom data class, the fields must match the custom class fields.")
.setFields(
Arrays.asList(
new TableFieldSchema().setName("string_value").setType("STRING"),
new TableFieldSchema().setName("int64_value").setType("INT64")))));
return schema;
}
}
Using a string
To create and use a table schema as a string that contains JSON-serialized
TableSchema
object, follow these steps.
Create a string that contains a JSON-serialized
TableSchema
object.Use the
withJsonSchema
method to provide your table schema when you apply a write transform.
To create and use a table schema as a string, follow these steps.
Create a single comma separated string of the form “field1:type1,field2:type2,field3:type3” that defines a list of fields. The type should specify the field’s BigQuery type.
Use the
schema
parameter to provide your table schema when you apply a write transform. Set the parameter’s value to the string.
The following example shows how to use a string to specify the same table schema as the previous example.
Setting the insertion method
BigQueryIO supports two methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides different tradeoffs of cost, quota, and data consistency. See the BigQuery documentation for different data ingestion options (specifically, load jobs and streaming inserts) for more information about these tradeoffs.
BigQueryIO chooses a default insertion method based on the input PCollection
.
You can use withMethod
to specify the desired insertion method. See
Write.Method
for the list of the available methods and their restrictions.
BigQueryIO chooses a default insertion method based on the input PCollection
.
You can use method
to specify the desired insertion method. See
WriteToBigQuery
for the list of the available methods and their restrictions.
BigQueryIO uses load jobs in the following situations:
- When you apply a BigQueryIO write transform to a bounded
PCollection
. - When you specify load jobs as the insertion method using
BigQueryIO.write().withMethod(FILE_LOADS)
.
- When you apply a BigQueryIO write transform to a bounded
PCollection
. - When you specify load jobs as the insertion method using
WriteToBigQuery(method='FILE_LOADS')
.
Note: If you use batch loads in a streaming pipeline:
You must use withTriggeringFrequency
to specify a triggering frequency for
initiating load jobs. Be careful about setting the frequency such that your
pipeline doesn’t exceed the BigQuery load job quota limit.
You can either use withNumFileShards
to explicitly set the number of file
shards written, or use withAutoSharding
to enable dynamic sharding (starting
2.29.0 release) and the number of shards may be determined and changed at
runtime. The sharding behavior depends on the runners.
You must use triggering_frequency
to specify a triggering frequency for
initiating load jobs. Be careful about setting the frequency such that your
pipeline doesn’t exceed the BigQuery load job quota limit.
You can set with_auto_sharding=True
to enable dynamic sharding (starting
2.29.0 release). The number of shards may be determined and changed at runtime.
The sharding behavior depends on the runners.
BigQueryIO uses streaming inserts in the following situations:
- When you apply a BigQueryIO write transform to an unbounded
PCollection
. - When you specify streaming inserts as the insertion method using
BigQueryIO.write().withMethod(STREAMING_INSERTS)
.
- When you apply a BigQueryIO write transform to an unbounded
PCollection
. - When you specify streaming inserts as the insertion method using
WriteToBigQuery(method='STREAMING_INSERTS')
.
Note: Streaming inserts by default enables BigQuery best-effort deduplication mechanism.
You can disable that by setting ignoreInsertIds
. The quota limitations
are different when deduplication is enabled vs. disabled.
Streaming inserts applies a default sharding for each table destination. You can
use withAutoSharding
(starting 2.28.0 release) to enable dynamic sharding and
the number of shards may be determined and changed at runtime. The sharding
behavior depends on the runners.
Note: Streaming inserts by default enables BigQuery best-effort deduplication mechanism.
You can disable that by setting ignore_insert_ids=True
. The quota limitations
are different when deduplication is enabled vs. disabled.
Streaming inserts applies a default sharding for each table destination. You can
set with_auto_sharding=True
(starting 2.29.0 release) to enable dynamic
sharding. The number of shards may be determined and changed at runtime. The
sharding behavior depends on the runners.
Writing to a table
To write to a BigQuery table, apply either a writeTableRows
or write
transform.
To write to a BigQuery table, apply the WriteToBigQuery
transform.
WriteToBigQuery
supports both batch mode and streaming mode. You must apply
the transform to a PCollection
of dictionaries. In general, you’ll need to use
another transform, such as ParDo
, to format your output data into a
collection.
The following examples use this PCollection
that contains quotes.
The writeTableRows
method writes a PCollection
of BigQuery TableRow
objects to a BigQuery table. Each element in the PCollection
represents a
single row in the table. This example uses writeTableRows
to write elements to a
PCollection<TableRow>
. The write operation creates a table if needed. If the
table already exists, it is replaced.
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;
class BigQueryWriteToTable {
public static void writeToTable(
String project,
String dataset,
String table,
TableSchema schema,
PCollection<TableRow> rows) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));
// Pipeline pipeline = Pipeline.create();
// PCollection<TableRow> rows = ...
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
// For CreateDisposition:
// - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
// required
// - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
// For WriteDisposition:
// - WRITE_EMPTY (default): raises an error if the table is not empty
// - WRITE_APPEND: appends new rows to existing rows
// - WRITE_TRUNCATE: deletes the existing rows before writing
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
// pipeline.run().waitUntilFinish();
}
}
The following example code shows how to apply a WriteToBigQuery
transform to
write a PCollection
of dictionaries to a BigQuery table. The write operation
creates a table if needed. If the table already exists, it is replaced.
The write
transform writes a PCollection
of custom typed objects to a BigQuery
table. Use .withFormatFunction(SerializableFunction)
to provide a formatting
function that converts each input element in the PCollection
into a
TableRow
. This example uses write
to write a PCollection<String>
. The
write operation creates a table if needed. If the table already exists, it is
replaced.
quotes.apply(
BigQueryIO.<Quote>write()
.to(tableSpec)
.withSchema(tableSchema)
.withFormatFunction(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
When you use streaming inserts, you can decide what to do with failed records.
You can either keep retrying, or return the failed records in a separate
PCollection
using the WriteResult.getFailedInserts()
method.
Using the Storage Write API
Starting with version 2.36.0 of the Beam SDK for Java, you can use the BigQuery Storage Write API from the BigQueryIO connector.
Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API.
BigQuery Storage Write API for Python SDK currently has some limitations on supported data types. As this method makes use of cross-language transforms, we are limited to the types supported at the cross-language boundary. For example, apache_beam.utils.timestamp.Timestamp
is needed to write a TIMESTAMP
BigQuery type. Also, some types (e.g. DATETIME
) are not supported yet. For more details, please refer to the full type mapping.
Note: If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
to build the expansion-service jar. If you are running from a released Beam SDK, the jar is already included.
Exactly-once semantics
To write to BigQuery using the Storage Write API, set withMethod
to
Method.STORAGE_WRITE_API
.
Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:
If you want to change the behavior of BigQueryIO so that all the BigQuery sinks
for your pipeline use the Storage Write API by default, set the
UseStorageWriteApi
option.
If your pipeline needs to create the table (in case it doesn’t exist and you
specified the create disposition as CREATE_IF_NEEDED
), you must provide a
table schema. The API uses the schema to validate data and convert it to a
binary protocol.
For streaming pipelines, you need to set two additional parameters: the number of streams and the triggering frequency.
The number of streams defines the parallelism of the BigQueryIO Write transform
and roughly corresponds to the number of Storage Write API streams that the
pipeline uses. You can set it explicitly on the transform via
withNumStorageWriteApiStreams
or provide the numStorageWriteApiStreams
option to the pipeline as defined in
BigQueryOptions
.
Please note this is only supported for streaming pipelines.
Triggering frequency determines how soon the data is visible for querying in
BigQuery. You can explicitly set it via
withTriggeringFrequency
or specify the number of seconds by setting the
storageWriteApiTriggeringFrequencySec
option.
The combination of these two parameters affects the size of the batches of rows that BigQueryIO creates before calling the Storage Write API. Setting the frequency too high can result in smaller batches, which can affect performance. As a general rule, a single stream should be able to handle throughput of at least 1Mb per second. Creating exclusive streams is an expensive operation for the BigQuery service, so you should use only as many streams as needed for your use case. Triggering frequency in single-digit seconds is a good choice for most pipelines.
Similar to streaming inserts, STORAGE_WRITE_API
supports dynamically determining
the number of parallel streams to write to BigQuery (starting 2.42.0). You can
explicitly enable this using withAutoSharding
.
STORAGE_WRITE_API
defaults to dynamic sharding when
numStorageWriteApiStreams
is set to 0 or is unspecified.
When using STORAGE_WRITE_API
, the PCollection
returned by
WriteResult.getFailedStorageApiInserts
contains the rows that failed to be written to the Storage Write API sink.
At-least-once semantics
If your use case allows for potential duplicate records in the target table, you
can use the
STORAGE_API_AT_LEAST_ONCE
method. This method doesn’t persist the records to be written to
BigQuery into its shuffle storage, which is needed to provide the exactly-once semantics
of the STORAGE_WRITE_API
method. Therefore, for most pipelines, using this method is often
less expensive and results in lower latency.
If you use STORAGE_API_AT_LEAST_ONCE
, you don’t need to
specify the number of streams, and you can’t specify the triggering frequency.
Auto sharding is not applicable for STORAGE_API_AT_LEAST_ONCE
.
When using STORAGE_API_AT_LEAST_ONCE
, the PCollection
returned by
WriteResult.getFailedStorageApiInserts
contains the rows that failed to be written to the Storage Write API sink.
Tune the Storage Write API
By default, the BigQueryIO Write transform uses Storage Write API settings that are reasonable for most pipelines.
If you see performance issues, such as stuck pipelines, quota limit errors, or monotonically increasing backlog, consider tuning the following pipeline options when you run the job:
Option (Java/Python) | Description |
---|---|
| If the write mode is STORAGE_API_AT_LEAST_ONCE and the
useStorageApiConnectionPool option is true , this
option sets the maximum number of connections that each pool creates, per
worker and region. If your pipeline writes many dynamic destinations (more
than 20), and you see performance issues or append operations are
competing for streams, then consider increasing this value. |
| If the write mode is In practice, the minimum number of connections created is the minimum
of this option and |
| If the write mode is STORAGE_API_AT_LEAST_ONCE , this option
sets the number of stream append clients allocated per worker and
destination. For high-volume pipelines with a large number of workers,
a high value can cause the job to exceed the BigQuery connection quota.
For most low- to mid-volume pipelines, the default value is sufficient. |
| Maximum size of a single append to the Storage Write API (best effort). |
| Maximum record count of a single append to the Storage Write API (best effort). |
| Expected maximum number of inflight messages per connection. |
| If If you enable multiplexing, consider setting the following options to tune the number of connections created by the connection pool:
For more information, see Connection pool management in the BigQuery documentation. |
Quotas
Before using the Storage Write API, be aware of the BigQuery Storage Write API quotas.
Using dynamic destinations
You can use the dynamic destinations feature to write elements in a
PCollection
to different BigQuery tables, possibly with different schemas.
The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements to the computed destination.
In addition, you can also write your own types that have a mapping function to
TableRow
, and you can use side inputs in all DynamicDestinations
methods.
To use dynamic destinations, you must create a DynamicDestinations
object and
implement the following methods:
getDestination
: Returns an object thatgetTable
andgetSchema
can use as the destination key to compute the destination table and/or schema.getTable
: Returns the table (as aTableDestination
object) for the destination key. This method must return a unique table for each unique destination.getSchema
: Returns the table schema (as aTableSchema
object) for the destination key.
Then, use write().to
with your DynamicDestinations
object. This example
uses a PCollection
that contains weather data and writes the data into a
different table for each year.
/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
final long year;
final long month;
final long day;
final double maxTemp;
public WeatherData() {
this.year = 0;
this.month = 0;
this.day = 0;
this.maxTemp = 0.0f;
}
public WeatherData(long year, long month, long day, double maxTemp) {
this.year = year;
this.month = month;
this.day = day;
this.maxTemp = maxTemp;
}
}
*/
PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
+ "WHERE year BETWEEN 2007 AND 2009")
.withCoder(AvroCoder.of(WeatherData.class)));
// We will send the weather data into different tables for every year.
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(
new DynamicDestinations<WeatherData, Long>() {
@Override
public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
return elem.getValue().year;
}
@Override
public TableDestination getTable(Long destination) {
return new TableDestination(
new TableReference()
.setProjectId(writeProject)
.setDatasetId(writeDataset)
.setTableId(writeTable + "_" + destination),
"Table for year " + destination);
}
@Override
public TableSchema getSchema(Long destination) {
return new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("year")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("month")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("day")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("maxTemp")
.setType("FLOAT")
.setMode("NULLABLE")));
}
})
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
('Obi Wan Kenobi', True)]))
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
Using time partitioning
BigQuery time partitioning divides your table into smaller partitions, which is called a partitioned table. Partitioned tables make it easier for you to manage and query your data.
To use BigQuery time partitioning, use one of these two methods:
withTimePartitioning
: This method takes aTimePartitioning
class, and is only usable if you are writing to a single table.withJsonTimePartitioning
: This method is the same aswithTimePartitioning
, but takes a JSON-serialized String object.
This example generates one partition per day.
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(tableSpec + "_partitioning")
.withSchema(tableSchema)
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
// NOTE: an existing table without time partitioning set up will not work
.withTimePartitioning(new TimePartitioning().setType("DAY"))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
Limitations
BigQueryIO currently has the following limitations.
You can’t sequence the completion of a BigQuery write with other steps of your pipeline.
If you are using the Beam SDK for Python, you might have import size quota issues if you write a very large dataset. As a workaround, you can partition the dataset (for example, using Beam’s
Partition
transform) and write to multiple BigQuery tables. The Beam SDK for Java does not have this limitation as it partitions your dataset for you.When you load data into BigQuery, these limits are applied. By default, BigQuery uses a shared pool of slots to load data. This means that the available capacity is not guaranteed, and your load may be queued until a slot becomes available. If a slot does not become available within 6 hours, the load will fail due to the limits set by BigQuery. To avoid this situation, it is highly recommended that you use BigQuery reservations, which ensure that your load does not get queued and fail due to capacity issues.
Additional examples
You can find additional examples that use BigQuery in Beam’s examples directories.
Java cookbook examples
These examples are from the Java cookbook examples directory.
BigQueryTornadoes reads the public samples of weather data from BigQuery, counts the number of tornadoes that occur in each month, and writes the results to a BigQuery table.
CombinePerKeyExamples reads the public Shakespeare data from BigQuery, and for each word in the dataset that exceeds a given length, generates a string containing the list of play names in which that word appears. The pipeline then writes the results to a BigQuery table.
FilterExamples reads public samples of weather data from BigQuery, performs a projection on the data, finds the global mean of the temperature readings, filters on readings for a single given month, and outputs only data (for that month) that has a mean temp smaller than the derived global mean.
JoinExamples reads a sample of the GDELT “world event” from BigQuery and joins the event
action
country code against a table that maps country codes to country names.MaxPerKeyExamples reads the public samples of weather data from BigQuery, finds the maximum temperature for each month, and writes the results to a BigQuery table.
TriggerExample performs a streaming analysis of traffic data from San Diego freeways. The pipeline looks at the data coming in from a text file and writes the results to a BigQuery table.
Java complete examples
These examples are from the Java complete examples directory.
AutoComplete computes the most popular hash tags for every prefix, which can be used for auto-completion. The pipeline can optionally write the results to a BigQuery table.
StreamingWordExtract reads lines of text, splits each line into individual words, capitalizes those words, and writes the output to a BigQuery table.
TrafficMaxLaneFlow reads traffic sensor data, finds the lane that had the highest recorded flow, and writes the results to a BigQuery table.
TrafficRoutes reads traffic sensor data, calculates the average speed for each window and looks for slowdowns in routes, and writes the results to a BigQuery table.
Python cookbook examples
These examples are from the Python cookbook examples directory.
BigQuery schema creates a
TableSchema
with nested and repeated fields, generates data with nested and repeated fields, and writes the data to a BigQuery table.BigQuery side inputs uses BigQuery sources as side inputs. It illustrates how to insert side-inputs into transforms in three different forms: as a singleton, as a iterator, and as a list.
BigQuery tornadoes reads from a BigQuery table that has the ‘month’ and ’tornado’ fields as part of the table schema, computes the number of tornadoes in each month, and outputs the results to a BigQuery table.
BigQuery filters reads weather station data from a BigQuery table, manipulates BigQuery rows in memory, and writes the results to a BigQuery table.
Last updated on 2025/10/23
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!