An Introduction To Data Stream Query Processing
Neil Conway <nconway@aminsight.com>
Amalgamated Insight, Inc.
May 24, 2007
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
1 / 45
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 2 / 45
5 6
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 3 / 45
5 6
The Need For Data Stream Processing
Whats wrong with database systems?
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
4 / 45
The Need For Data Stream Processing
Whats wrong with database systems? Nothing, but they arent the right solution to every problem
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
4 / 45
The Need For Data Stream Processing
Whats wrong with database systems? Nothing, but they arent the right solution to every problem What are some problems for which a traditional DBMS is an awkward t?
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
4 / 45
Financial Analysis
Electronic trading is now commonplace
Trading volume continues to increase rapidly
Algorithmic trading: detect advantageous market conditions, automatically execute trades
Latency is key
Visualization
A hard problem in itself
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
5 / 45
Financial Analysis
Electronic trading is now commonplace
Trading volume continues to increase rapidly
Algorithmic trading: detect advantageous market conditions, automatically execute trades
Latency is key
Visualization
A hard problem in itself
Typical Queries
5-minute rolling average, volume-waited average price (VWAP) Comparison between sector averages and portfolio averages over time Implement models provided by quantitive analysis
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
5 / 45
Network Monitoring
Network volume continues to increase rapidly Custom solutions are possible, but roll-your-own is expensive
Ad-hoc queries would be nice
Can we build generic infrastructure for these kinds of monitoring applications?
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
6 / 45
Sensor Networks
Pervasive Sensors
As the cost of micro sensors continues to decline over the next decade, we could see a world in which everything of material signicance gets sensor-tagged. Mike Stonebraker Military applications: real-time command and control Healthcare Habitat monitoring Manufacturing
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
7 / 45
Other Examples
Real-Time Decision Support
Turnaround-time for traditional data warehouses is often too slow Business Activity Monitoring (BAM)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
8 / 45
Other Examples
Real-Time Decision Support
Turnaround-time for traditional data warehouses is often too slow Business Activity Monitoring (BAM)
Fraud Detection
Sophisticated, cross-channel fraud Real-time
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
8 / 45
Other Examples
Real-Time Decision Support
Turnaround-time for traditional data warehouses is often too slow Business Activity Monitoring (BAM)
Fraud Detection
Sophisticated, cross-channel fraud Real-time
Online Gaming
Detect malicious behavior Monitor quality of service
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
8 / 45
Data Stream Management Systems
Database Systems
Mostly static data, ad-hoc one-time queries Fire the queries at the data, return result sets Store and query Focus: concurrent reads & writes, ecient use of I/O, maximize transaction throughput, transactional consistency, historical analysis
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
9 / 45
Data Stream Management Systems
Database Systems
Mostly static data, ad-hoc one-time queries Fire the queries at the data, return result sets Store and query Focus: concurrent reads & writes, ecient use of I/O, maximize transaction throughput, transactional consistency, historical analysis
Data Stream Systems
Mostly transient data, continuous queries Fire the data at the queries, incrementally update result streams Data rates often exceed disk throughput
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
9 / 45
Complex Event Processing (CEP)
Data stream processing emerged from the database community
Early 90s: active databases with triggers
Complex Event Processing is another approach to the same problems
Dierent nomenclature and background Often similar in practice
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
10 / 45
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 11 / 45
5 6
Data Streams
A stream is an innite sequence of tuple, timestamp pairs
Append-only New type of database object
The timestamp denes a total order over the tuples in a stream
In practice: require that stream tuples have a special CQTIME column
Dierent approaches to building stream processing systems
This talk: relation-oriented DSMS. Specically, TelegraphCQ, AmInsight, StreamBase, . . .
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
12 / 45
CREATE STREAM
Exactly 1 column must have a CQTIME constraint
CQTIME can be system-generated or user-provided
With user-provided timestamps, system must cope with out-of-order tuples
Slack species maximum out-of-orderness
Example Query
CREATE STREAM trades ( symbol varchar(5), price real, volume integer, tstamp timestamp CQTIME USER GENERATED SLACK 1 minute ) TYPE UNARCHIVED;
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
13 / 45
Types of Streams
Raw Streams
Stream tuples are injected into the system by an external data source E.g. stock tickers, sensor data, network interface, . . . Both push and pull models have been explored
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
14 / 45
Types of Streams
Raw Streams
Stream tuples are injected into the system by an external data source E.g. stock tickers, sensor data, network interface, . . . Both push and pull models have been explored
Derived Streams
Dened by a query expression that yields a stream
Archived Streams
Allows historical and real-time stream content to be combined in a single database object
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
14 / 45
Language Design Philosophy
Pragmatism: relational query languages are well-established
Relational query evaluation techniques are well-understood Everyone knows SQL
Therefore, add stream-oriented extensions to SQL
Pioneering work: CQL from Stanford STREAM project
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
15 / 45
Language Design Philosophy
Pragmatism: relational query languages are well-established
Relational query evaluation techniques are well-understood Everyone knows SQL
Therefore, add stream-oriented extensions to SQL
Pioneering work: CQL from Stanford STREAM project
Kinds Of Operators
Relation Relation: Plain Old SQL Stream Relation: Periodically produce a relation from a stream Relation Stream: Produce stream from changes to a relation Note that S S operators are not provided.
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
15 / 45
Continuous Queries
Fundamental Dierence
The result of a continuous query is an unbounded stream, not a nite relation
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
16 / 45
Continuous Queries
Fundamental Dierence
The result of a continuous query is an unbounded stream, not a nite relation
Typical Query
1
Split innite stream into pieces via windows
S R
Compute analysis for the current window, comparison with prior windows or historical data
RR
Convert result of analysis into result stream
RS Often implicit (use defaults)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
16 / 45
Stream Relation Operators: Windows
Streams are innite: at any given time, examine a nite sub-set Apply window operator to stream to periodically produce visible sets of tuples
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
17 / 45
Stream Relation Operators: Windows
Streams are innite: at any given time, examine a nite sub-set Apply window operator to stream to periodically produce visible sets of tuples
Properties of Sliding Windows
Range: Width of the window. Units: rows or time. Slide: How often to emit new visible sets. Units: rows or time. Start: When to start emitting results.
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
17 / 45
Example Query
Description
Every second, return the total volume of trades in the previous second.
Query
SELECT FROM sum(volume) AS volume, advance_agg(qtime) AS windowtime trades < VISIBLE 1 second ADVANCE 1 second >
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
18 / 45
Another Example
Description
Every 5 seconds, return the volume-adjusted price of MSFT for the last 1 minute of trades.
Query
SELECT sum(price * volume) / sum(volume) AS vwap, sum(volume) AS volume, advance_agg(qtime) AS windowtime trades < VISIBLE 1 minute ADVANCE 5 seconds > symbol = MSFT
FROM WHERE
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
19 / 45
More About Windows
Aggregation
Useful aggregate: advance agg(CQTIME ) Timestamp that marks the end of the current window Similar aggregates for beginning of window, middle of window might also be useful
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
20 / 45
More About Windows
Aggregation
Useful aggregate: advance agg(CQTIME ) Timestamp that marks the end of the current window Similar aggregates for beginning of window, middle of window might also be useful
Other Window Types
Landmark: Fixed left edge, elastic right edge. Periodically reset. (All stock trades after 9AM today.) Partitioned: Divide stream into sub-streams based on partitioning key(s), then apply another S R operator to the sub-streams.
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
20 / 45
Relation Stream Operators
Types of Operators
ISTREAM: the tuples added to a relation RSTREAM: all the tuples in a relation DSTREAM: the tuples removed from relation
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
21 / 45
Relation Stream Operators
Types of Operators
ISTREAM: the tuples added to a relation RSTREAM: all the tuples in a relation DSTREAM: the tuples removed from relation
Defaults
ISTREAM for queries without aggregation/grouping RSTREAM for queries with aggregation/grouping DSTREAM is rarely useful
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
21 / 45
Mixed Joins
Common Requirement
Compare stream tuples with historical data System must provide both tables and streams! Elegantly modeled as a join between a table and a stream
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
22 / 45
Mixed Joins
Common Requirement
Compare stream tuples with historical data System must provide both tables and streams! Elegantly modeled as a join between a table and a stream
Implementation
Stream is the right (outer) join operand; left (inner) operand is arbitrary Postgres subplan
For each stream tuple, join against non-continuous subplan
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
22 / 45
Mixed Join Example
Description
Every 3 seconds, compute the total value of high-volume trades made on stocks in the S & P 500 in the past 5 seconds.
Example Query
SELECT FROM T.symbol, sum(T.price * T.volume) s_and_p_500 S, trades T < VISIBLE 5 sec ADVANCE 3 sec > WHERE T.symbol = S.symbol AND T.volume > 5000 GROUP BY T.symbol
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
23 / 45
Composing Streams
The tuples in a stream can be viewed as a series of events
E.g. The temperature in the room is 20 , 25 , 30 , . . .
The output of a continuous query is another series of events, typically higher-level or more complex
E.g. The room is on re.
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
24 / 45
Composing Streams
The tuples in a stream can be viewed as a series of events
E.g. The temperature in the room is 20 , 25 , 30 , . . .
The output of a continuous query is another series of events, typically higher-level or more complex
E.g. The room is on re.
Therefore, streams can be composed in various ways:
Stream views
Macro semantics
Derived streams Subqueries Active tables
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
24 / 45
Derived Streams
A derived stream is a database object dened by a persistent continuous query Unlike a stream view, always active Similar to a materialized view
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
25 / 45
Example Query
Description
Every 3 seconds, compute the volume-weighted average price (VWAP) for all stocks traded in the past 5 seconds.
Query
CREATE STREAM vwap (symbol varchar(5), vwap float, vtime timestamp cqtime) AS (SELECT symbol, sum(price * volume) / sum(volume), advance_agg(qtime) FROM trades < VISIBLE 5 seconds ADVANCE 3 seconds > GROUP BY symbol);
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
26 / 45
Subqueries
One-time subqueries can be used in continuous queries, of course Continuous subqueries are planned and executed as independent queries
Essentially inline derived streams
Require that subqueries yielding streams specify CQTIME Planned: WITH-clause subqueries
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
27 / 45
Active Tables
An active table is a table with an associated continuous query Two modes of operation: Append: New stream tuples appended to table at each window Replace: At each new window, truncate previous table contents
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
28 / 45
Event Language
Example Query
SELECT FROM WHERE EVENT Shoplifting!, D.loc, D.id Store S C D PARTITION BY id S.loc = shelf and C.loc = checkout AND D.loc = door AND (FOLLOWS(S, D, 1 hour), NOT PRECEDES(C, D, 1 hour));
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
29 / 45
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 30 / 45
5 6
Basic Requirements
Adaptivity
Static query planning is undesirable for long-running queries Either replan or use adaptive planning
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
31 / 45
Basic Requirements
Adaptivity
Static query planning is undesirable for long-running queries Either replan or use adaptive planning
Shared Processing
Essential for good performance: 100s of queries not uncommon Long-lived queries make this more feasible
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
31 / 45
Basic Requirements
Adaptivity
Static query planning is undesirable for long-running queries Either replan or use adaptive planning
Shared Processing
Essential for good performance: 100s of queries not uncommon Long-lived queries make this more feasible
Graceful Overload Handling
Stream data rates are often highly variable Often too expensive to provision for maximal data rate Therefore, must handle overload gracefully
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
31 / 45
System Architecture
Modied version of PostgreSQL One-time queries executed normally Continuous queries planned and executed by the CqRuntime process
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
32 / 45
System Architecture
Modied version of PostgreSQL One-time queries executed normally Continuous queries planned and executed by the CqRuntime process Stream input: COPY, or submitted via TCP to CqIngress process
libevent-based, simple COPY-like protocol
Stream output: cursors, active tables, CqEgress process Communication between processes done via shared memory queue infrastructure
Message passing done via SysV shmem and locks
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
32 / 45
Shared Runtime
New continuous query is dened shared runtime via shared memory Runtime plans the query, folds query into single shared query plan
Not a traditional tree; graph of operators
Shared Runtime Main Loop
1 2
Check for control messages: add new CQ, remove CQ, . . . Check for new stream tuples
Route each stream tuple through the operator graph (CPS) Push output tuples to result consumers
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
33 / 45
Shared Evaluation
Continuous query evaluation done by a network of operators in the shared runtime If multiple queries reference the same operator, we can evaluate it only once
Better than linear scalability!
Each operator keeps track of the queries it helps to implement
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
34 / 45
Implementing Shared Evaluation
Sharing Predicates
Simple cases: <, , =, >, , =
Construct a tree that divides domain of type into disjoint regions For each tuple: walk the tree to nd the region the tuple belongs in
Region implies which queries the tuple is still visible to
Immutable functions can also be shared relatively easily
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
35 / 45
Implementing Shared Evaluation
Sharing Predicates
Simple cases: <, , =, >, , =
Construct a tree that divides domain of type into disjoint regions For each tuple: walk the tree to nd the region the tuple belongs in
Region implies which queries the tuple is still visible to
Immutable functions can also be shared relatively easily
Sharing Joins, Aggregates
Can also be done Even between queries with varying windows and predicates Requires some thought (say, a PhD thesis or two)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
35 / 45
Adaptive Tuple Routing
Given a new tuple, how do we route it through the graph of operators?
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
36 / 45
Adaptive Tuple Routing
Given a new tuple, how do we route it through the graph of operators? Traditional approach: statically choose an optimal route for each stream
Hard optimization problem Need to re-optimize when new queries dened or system conditions change (e.g. operator selectivity)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
36 / 45
Adaptive Tuple Routing
Given a new tuple, how do we route it through the graph of operators? Traditional approach: statically choose an optimal route for each stream
Hard optimization problem Need to re-optimize when new queries dened or system conditions change (e.g. operator selectivity)
TelegraphCQ approach: adaptive per-tuple routing
Push tuples one at a time through the operator graph; choose order of operators at runtime
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
36 / 45
Implementing Adaptive Routing
For each tuple, maintain lineage
What operators has this tuple visited? Which queries can still see this tuple?
Implication: cant push down projections Make routing decisions on the basis of simple run-time statistics
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
37 / 45
Handling Overload
Common scenario: peak stream rate >> average stream rate (bursty) The system should cope gracefully
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
38 / 45
Handling Overload
Common scenario: peak stream rate >> average stream rate (bursty) The system should cope gracefully Three alternatives:
1
Spool tuples to disk, process later
But stream rates often exceed disk throughput
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
38 / 45
Handling Overload
Common scenario: peak stream rate >> average stream rate (bursty) The system should cope gracefully Three alternatives:
1
Spool tuples to disk, process later
But stream rates often exceed disk throughput
Drop excess tuples
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
38 / 45
Handling Overload
Common scenario: peak stream rate >> average stream rate (bursty) The system should cope gracefully Three alternatives:
1
Spool tuples to disk, process later
But stream rates often exceed disk throughput
2 3
Drop excess tuples Substitute statistical summaries for dropped stream tuples
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
38 / 45
Handling Overload
Common scenario: peak stream rate >> average stream rate (bursty) The system should cope gracefully Three alternatives:
1
Spool tuples to disk, process later
But stream rates often exceed disk throughput
2 3
Drop excess tuples Substitute statistical summaries for dropped stream tuples
Quality of Service (QoS)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
38 / 45
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 39 / 45
5 6
Open Source DSMS
Esper
DSMS engine written in Java (GPL). SQL-like stream query language. http://esper.codehaus.org
TelegraphCQ
Academic prototype from UC Berkeley, based on PostgreSQL 7.3 PostgreSQLs SQL dialect, plus stream-oriented extensions BSD licensed; http://telegraph.cs.berkeley.edu
StreamCruncher
DSMS engine written in Java. Free for commercial use (not open source). http://www.streamcruncher.com
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
40 / 45
Proprietary DSMS
StreamBase
A Stonebraker company. Founded in 2003.
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
41 / 45
Proprietary DSMS
StreamBase
A Stonebraker company. Founded in 2003.
Other Startups
Coral8 Apama (purchased by Progress Software in 2005) and more . . .
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
41 / 45
Proprietary DSMS
StreamBase
A Stonebraker company. Founded in 2003.
Other Startups
Coral8 Apama (purchased by Progress Software in 2005) and more . . .
Established Companies
TIBCO BusinessEvents, Oracle BAM
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
41 / 45
Amalgamated Insight
Based on the experience gained from TelegraphCQ
New codebase
Application components:
1
Continuous Query Engine
Modied version of PostgreSQL (currently 8.1.9+)
Integration Framework
Connectors, input/output converters, query management
Visualization
Closed Series A funding in June 2006 1.0 release will be available Real Soon Now (currently RC3)
Lesson: PostgreSQL is a huge competitive advantage
Were hiring :-)
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
42 / 45
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 43 / 45
5 6
Outline
1 2 3
The Need For Data Stream Processing Stream Query Languages Query Processing Techniques For Streams System Architecture Shared Evaluation Adaptive Tuple Routing Overload Handling Current Choices For A DSMS Open Source Proprietary Demo Q&A
Neil Conway (AmInsight) Data Stream Query Processing May 24, 2007 44 / 45
5 6
Q&A
Thank You. Any Questions?
Neil Conway (AmInsight)
Data Stream Query Processing
May 24, 2007
45 / 45