GraphX & Graph Analytics
GraphX
GraphX is a distributed graph computation framework that unifies
graph parallel and data parallel computation for Big Data Analytics
GraphX
GraphX is a component of Apache Spark for graphs and graph-parallel
computation.
It unifies ETL (Extract, Transform, Load), exploratory analysis, and
iterative graph computation within a single system.
● Built on top of Apache Spark
● Supports RDD-based graph abstraction
● Enables graph-parallel computation using Pregel API
● Combines the power of both data-parallel and graph-parallel systems
Introduction
Graphs are only useful for specific things.
● It can measure things like “connectedness”, degree distribution,
average path length, triangle counts-high level measures of a graph.
● It can count triangles in the graph, and apply the PageRank algorithm
to it.
● It can also join graphs together and transform graphs
quickly It supports the Pregel API( google) for traversing
a graph.
• Introduces VertexRDD and EdgeRDD, and the Edge data type .
Getting Started
To get started you first need to import Spark and GraphX into your
project, as follows:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
Graphs in Machine Learning Landscape
● Graphs are a flexible and powerful data structure used to
represent entities (nodes/vertices) and the relationships between
them (edges).
● Traditional ML models work well with tabular data, but many
real-world scenarios involve interconnected data — which is
best modeled as graphs.
Graph-Structured Data
Graph-structured data consists of:
● Vertices (Nodes): represent entities (e.g., users, airports)
● Edges (Links): represent relationships or interactions between nodes
The Property Graph
● Definition: A directed multigraph with user-defined objects attached to each
vertex and edge.
● Key Characteristics:
○ Directed Multigraph: Allows multiple parallel edges between the same
source and destination.
○ Vertex Identifier: Each vertex is keyed by a unique 64-bit VertexId.
○ Edge Properties: Identified by source and destination vertex IDs.
● Applications:
○ Multiple relationships modeling (e.g., co-worker and friend between two
vertices).
Parameterization of Property Graphs
● Vertex (VD) and Edge (ED) Types:
○ VD: Data associated with each vertex.
○ ED: Data associated with each edge.
● Optimization:
○ Primitive types (e.g., int, double) reduce memory usage by using
specialized arrays.
Heterogeneous Vertex Types
Scenario: Modeling vertices with different property types.
Implementation via Inheritance:
Immutability and Fault Tolerance
● Immutability:
○ Graphs are immutable like RDDs.
○ Changes produce new graphs, reusing unaffected parts of the
original.
● Fault Tolerance:
○ Graphs are partitioned across executors.
○ Partitions can be recreated on different machines if failures occur
Logical Structure of Property Graphs
● Components:
○ Vertices: RDD encoding properties of each vertex.
○ Edges: RDD encoding properties of each edge.
● Graph Class:
Optimized RDDs in Property Graphs
● VertexRDD[VD] and EdgeRDD[ED]:
○ Extend and optimize RDD[(VertexId, VD)] and RDD[Edge[ED]].
○ Provide additional graph computation functionality.
● Conceptual Representation:
○ VertexRDD: RDD[(VertexId, VD)]
○ EdgeRDD: RDD[Edge[ED]]
Key Benefits of Property Graphs in GraphX
● Efficient Storage: Optimized memory usage for primitive data
types.
● Flexible Modeling: Supports diverse vertex and edge
properties.
● Distributed and Fault-Tolerant: Handles failures seamlessly.
● Functional Structure: Immutability ensures clean
transformations.
● Rich API: Extends RDD functionality for graph computations.
Example Property Graph
Suppose we want to construct a property graph consisting of the various
collaborators on the GraphX project. The vertex property might contain
the username and occupation. We could annotate edges with a string
describing the relationships between collaborators:
Example Property Graph
Graph-Structured Data
Example:
● Social Network:
○ Nodes = users
○ Edges = friendships
Types of graphs:
● Directed vs Undirected
● Weighted vs Unweighted
● Homogeneous vs Heterogeneous
Applications & Examples
1. Social Network Analysis
Graphs represent:
● Users → Nodes
● Friendships or follows → Edges
🔹 Use Case: Node Classification
● Goal: Predict the type of a user (e.g., spam vs real, interests, age group).
● Example:
○ Facebook graph with users and friends.
○ Train a model to classify users into categories based on their connections.
🔹 Use Case: Link Prediction
● Goal: Predict if a link (friendship) is likely to form.
● Example:
○ Suggesting "People You May Know" on Facebook.
○ Based on mutual friends, interests, and interaction patterns.
2. Recommendation Systems
Graphs represent:
● Users and items (movies, books, products) as nodes
● Interactions (e.g., rating, purchase) as edges
🔹 Use Case: Link Prediction
● Goal: Recommend products a user might buy.
● Example:
○ In Amazon, if User A likes Book 1 and Book 2, and User B likes
Book 2 and Book 3, then Book 1 can be recommended to User B.
○ This is modeled as collaborative filtering on a bipartite graph.
Drug Discovery (Bioinformatics)
Graphs represent:
● Atoms → Nodes
● Chemical bonds → Edges
🔹 Use Case: Graph Classification
● Goal: Predict the property of a molecule.
● Example:
○ Classify if a molecule is toxic or not.
○ Model learns structure–activity relationship (SAR) from
molecular graphs.
○ Graph Neural Networks (GNNs) like Graph Convolutional
Networks (GCNs) are commonly used.
Introduction to Graph Operators
● Definition: Graph operators are functions applied to graphs to
transform, analyze, or retrieve information from their vertices
and edges.
●
● Key Objectives:
○ Enable transformations and computations on graph data.
○ Provide flexibility and efficiency in graph processing.
Introduction to Graph Operators
● Categories:
○ Property Operators: Modify vertex or edge attributes.
○ Structural Operators: Alter graph structure.
○ Join Operators: Integrate external data with graph
elements.
● Application Areas:
○ Social network analysis.
○ Recommendations systems.
○ Fraud detection.
Property Operators
● Purpose: Transform vertex or edge properties without changing
the graph structure.
● Preserve structural indices for optimization.
● Initialize graphs for specific computations.
Property Operators
Structural Operators
● Purpose: Modify the structure of the graph.
Examples:
● Removing invalid vertices or edges.
● Simplifying multigraphs.
Structural Operators
Join Operators
● Purpose: Combine external data with graph elements.
Applications:
● Enriching graph data with external attributes.
● Integrating results from other computations.
Join Operators
joinVertices: Joins an RDD with vertices and modifies vertex properties.
val updatedGraph = graph.joinVertices(extraData)((id, oldAttr,
newAttr) => newAttr)
outerJoinVertices: Joins an RDD with vertices and allows for unmatched
vertices.
val updatedGraph = graph.outerJoinVertices(extraData)((id, attr,
opt) => opt.getOrElse(attr))
Advanced Graph Computations
Aggregate Messages:
● Collects and aggregates information from neighboring vertices.
Advanced Graph Computations
Pregel API:
● Iterative computation framework for graph processing.
Neighborhood Aggregation in GraphX
● Neighborhood aggregation is central to many graph analytics tasks.
● Examples include:
○ Counting followers.
○ Calculating average attributes (e.g., age of followers).
○ Iterative algorithms like PageRank, Shortest Path, and Connected
Components.
● Aggregation operators transition from mapReduceTriplets to
aggregateMessages for improved performance.
AggregateMessages Operator
● Purpose: Core operation for neighborhood aggregation.
● Definition:
Components:
1. sendMsg: Map function to send messages via EdgeContext.
2. mergeMsg: Reduce function to aggregate messages.
3. tripletFields: Optional argument to optimize join strategies by specifying accessed fields.
AggregateMessages Operator
Key Features
● Explicit user control over accessed fields (TripletFields).
● Returns VertexRDD[Msg] containing aggregated messages.
● Vertices without messages are excluded.
● Optimized for constant-sized messages.
Example: Average Age of Older Followers
val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc,
numVertices = 100).mapVertices((id, _) => id.toDouble)
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int,
Double)]( triplet => {
if (triplet.srcAttr > triplet.dstAttr) {
triplet.sendToDst((1, triplet.srcAttr)) }
}, (a, b) => (a._1 + b._1, a._2 + b._2) )
val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues
{ case (count, totalAge) => totalAge / count }
avgAgeOfOlderFollowers.collect.foreach(println)
Legacy Operator: mapReduceTriplets
Replaced by aggregateMessages due to:
● Inefficiency of iterator-based message aggregation.
● Limited optimization capabilities.
val graph: Graph[Int, Float] = ...
// mapReduceTriplets
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = Iterator((triplet.dstId, "Hi"))
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
// aggregateMessages
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
Common Aggregation Tasks
1. Degree Information:
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
2. Collecting Neighbors:
val neighborIds: VertexRDD[Array[VertexId]] =
graph.collectNeighborIds(EdgeDirection.Out)
val neighbors: VertexRDD[Array[(VertexId, VD)]] =
graph.collectNeighbors(EdgeDirection.In)
Collecting Neighbors
In some cases it may be easier to express computation by collecting neighboring
vertices and their attributes at each vertex. This can be easily accomplished using
the collectNeighborIds and the collectNeighbors operators.
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[
Array[(VertexId, VD)] ]
}
Distributed Graphs
GraphX adopts a vertex-cut approach to distributed graph partitioning:
Distributed Graphs
● GraphX partitions the graph along vertices which can reduce both the
communication and storage overhead.
● Logically, this corresponds to assigning edges to machines and
allowing vertices to span multiple machines.
● The exact method of assigning edges depends on the PartitionStrategy
and there are several tradeoffs to the various heuristics.
● Users can choose between different strategies by repartitioning the
graph with the Graph.partitionBy operator.
● The default partitioning strategy is to use the initial partitioning of the
edges as provided on graph construction.
Graph Algorithms
Page Rank Algorithm
● What is PageRank?
○ Measures the importance of each vertex in a graph based on the
edges (or connections).
○ Works on the premise that an edge from node u to node v
represents an endorsement of v’s importance by u.
○ Example: A Twitter user with many followers will have a higher
rank (importance).
Page Rank Algorithm
● Types of PageRank in GraphX:
○ Static PageRank: Runs for a fixed number of iterations.
○ Dynamic PageRank: Runs until the ranks converge (i.e., stop
changing by more than a specified tolerance).
Page Rank Algorithm
Using PageRank in GraphX
● How to use PageRank:
○ PageRank Methods: Available as methods on the PageRank
object.
○ GraphOps: Enables calling algorithms directly on the graph.
Page Rank Algorithm
● Example Dataset:
○ Social Network Example:
■ Users are listed in data/graphx/users.txt.
■ Relationships between users are in data/graphx/followers.txt.
○ Goal: Compute the PageRank for each user based on their
followers.
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1)) }
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
What are Connected Components?
● A connected component is a subgraph in which any two vertices are
connected by paths.
● Each connected component is labeled with the ID of its
lowest-numbered vertex.
● Example: In a social network, connected components can represent
clusters of users who are all connected directly or indirectly.
Connected Components in GraphX
● GraphX Implementation:
○ The algorithm is available in the ConnectedComponents object
in GraphX.
○ Labels each connected component in the graph with a unique ID
(the ID of the lowest-numbered vertex).
● How It Works:
○ The connected component algorithm runs on a graph and assigns
the same ID to all vertices in the same connected component.
○ Vertices with no edges are considered individual components.
Connected Components in GraphX
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
/
Connected Components in GraphX
/ Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)}
// Print the result
println(ccByUsername.collect().mkString("\n"))
Triangle Counting in GraphX
● What is Triangle Counting?
○ A vertex is part of a triangle if it has two adjacent vertices with an
edge between them.
○ Triangle counting provides a measure of clustering by counting
how many triangles pass through a vertex.
Triangle Counting in GraphX
● GraphX Implementation:
○ The triangle counting algorithm is available in the TriangleCount
object in GraphX.
○ The algorithm counts the number of triangles that pass through
each vertex.
Triangle Counting in GraphX
● How It Works:
○ A triangle is formed when three vertices are connected, and each
pair of adjacent vertices has an edge between them.
○ Triangle counting is a measure of clustering in the graph,
indicating how connected the graph is in groups.
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle
count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt",
true) .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1)) }
val triCountByUsername = users.join(triCounts).map { case (id,
(username, tc)) => (username, tc)}
// Print the result
println(triCountByUsername.collect().mkString("\n"))