KEMBAR78
GraphX & Graph Analytics | PDF | Apache Spark | Vertex (Graph Theory)
0% found this document useful (0 votes)
43 views61 pages

GraphX & Graph Analytics

GraphX is a distributed graph computation framework within Apache Spark that integrates graph and data parallel computation for Big Data Analytics. It supports various graph operations, including property graphs, neighborhood aggregation, and advanced graph algorithms like PageRank and connected components. GraphX is designed for efficient storage, flexible modeling, and fault tolerance, making it suitable for applications in social network analysis, recommendation systems, and drug discovery.

Uploaded by

nainalashalini
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
43 views61 pages

GraphX & Graph Analytics

GraphX is a distributed graph computation framework within Apache Spark that integrates graph and data parallel computation for Big Data Analytics. It supports various graph operations, including property graphs, neighborhood aggregation, and advanced graph algorithms like PageRank and connected components. GraphX is designed for efficient storage, flexible modeling, and fault tolerance, making it suitable for applications in social network analysis, recommendation systems, and drug discovery.

Uploaded by

nainalashalini
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 61

GraphX & Graph Analytics

GraphX
GraphX is a distributed graph computation framework that unifies
graph parallel and data parallel computation for Big Data Analytics
GraphX
GraphX is a component of Apache Spark for graphs and graph-parallel
computation.
It unifies ETL (Extract, Transform, Load), exploratory analysis, and
iterative graph computation within a single system.
● Built on top of Apache Spark
● Supports RDD-based graph abstraction
● Enables graph-parallel computation using Pregel API
● Combines the power of both data-parallel and graph-parallel systems
Introduction
Graphs are only useful for specific things.
● It can measure things like “connectedness”, degree distribution,
average path length, triangle counts-high level measures of a graph.
● It can count triangles in the graph, and apply the PageRank algorithm
to it.
● It can also join graphs together and transform graphs
quickly It supports the Pregel API( google) for traversing
a graph.
• Introduces VertexRDD and EdgeRDD, and the Edge data type .
Getting Started

To get started you first need to import Spark and GraphX into your
project, as follows:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
Graphs in Machine Learning Landscape

● Graphs are a flexible and powerful data structure used to


represent entities (nodes/vertices) and the relationships between
them (edges).
● Traditional ML models work well with tabular data, but many
real-world scenarios involve interconnected data — which is
best modeled as graphs.
Graph-Structured Data

Graph-structured data consists of:


● Vertices (Nodes): represent entities (e.g., users, airports)

● Edges (Links): represent relationships or interactions between nodes


The Property Graph

● Definition: A directed multigraph with user-defined objects attached to each


vertex and edge.
● Key Characteristics:
○ Directed Multigraph: Allows multiple parallel edges between the same
source and destination.
○ Vertex Identifier: Each vertex is keyed by a unique 64-bit VertexId.
○ Edge Properties: Identified by source and destination vertex IDs.
● Applications:
○ Multiple relationships modeling (e.g., co-worker and friend between two
vertices).
Parameterization of Property Graphs

● Vertex (VD) and Edge (ED) Types:


○ VD: Data associated with each vertex.
○ ED: Data associated with each edge.

● Optimization:
○ Primitive types (e.g., int, double) reduce memory usage by using
specialized arrays.
Heterogeneous Vertex Types

Scenario: Modeling vertices with different property types.


Implementation via Inheritance:
Immutability and Fault Tolerance

● Immutability:
○ Graphs are immutable like RDDs.
○ Changes produce new graphs, reusing unaffected parts of the
original.

● Fault Tolerance:
○ Graphs are partitioned across executors.
○ Partitions can be recreated on different machines if failures occur
Logical Structure of Property Graphs
● Components:
○ Vertices: RDD encoding properties of each vertex.
○ Edges: RDD encoding properties of each edge.

● Graph Class:
Optimized RDDs in Property Graphs
● VertexRDD[VD] and EdgeRDD[ED]:
○ Extend and optimize RDD[(VertexId, VD)] and RDD[Edge[ED]].
○ Provide additional graph computation functionality.

● Conceptual Representation:
○ VertexRDD: RDD[(VertexId, VD)]
○ EdgeRDD: RDD[Edge[ED]]
Key Benefits of Property Graphs in GraphX

● Efficient Storage: Optimized memory usage for primitive data


types.
● Flexible Modeling: Supports diverse vertex and edge
properties.
● Distributed and Fault-Tolerant: Handles failures seamlessly.
● Functional Structure: Immutability ensures clean
transformations.
● Rich API: Extends RDD functionality for graph computations.
Example Property Graph
Suppose we want to construct a property graph consisting of the various
collaborators on the GraphX project. The vertex property might contain
the username and occupation. We could annotate edges with a string
describing the relationships between collaborators:
Example Property Graph
Graph-Structured Data

Example:
● Social Network:
○ Nodes = users
○ Edges = friendships
Types of graphs:
● Directed vs Undirected
● Weighted vs Unweighted
● Homogeneous vs Heterogeneous
Applications & Examples

1. Social Network Analysis


Graphs represent:
● Users → Nodes

● Friendships or follows → Edges


🔹 Use Case: Node Classification
● Goal: Predict the type of a user (e.g., spam vs real, interests, age group).

● Example:

○ Facebook graph with users and friends.

○ Train a model to classify users into categories based on their connections.


🔹 Use Case: Link Prediction
● Goal: Predict if a link (friendship) is likely to form.

● Example:

○ Suggesting "People You May Know" on Facebook.

○ Based on mutual friends, interests, and interaction patterns.


2. Recommendation Systems

Graphs represent:
● Users and items (movies, books, products) as nodes

● Interactions (e.g., rating, purchase) as edges


🔹 Use Case: Link Prediction
● Goal: Recommend products a user might buy.

● Example:

○ In Amazon, if User A likes Book 1 and Book 2, and User B likes


Book 2 and Book 3, then Book 1 can be recommended to User B.

○ This is modeled as collaborative filtering on a bipartite graph.


Drug Discovery (Bioinformatics)

Graphs represent:
● Atoms → Nodes
● Chemical bonds → Edges
🔹 Use Case: Graph Classification
● Goal: Predict the property of a molecule.

● Example:
○ Classify if a molecule is toxic or not.
○ Model learns structure–activity relationship (SAR) from
molecular graphs.
○ Graph Neural Networks (GNNs) like Graph Convolutional
Networks (GCNs) are commonly used.
Introduction to Graph Operators

● Definition: Graph operators are functions applied to graphs to


transform, analyze, or retrieve information from their vertices
and edges.

● Key Objectives:
○ Enable transformations and computations on graph data.
○ Provide flexibility and efficiency in graph processing.
Introduction to Graph Operators

● Categories:
○ Property Operators: Modify vertex or edge attributes.
○ Structural Operators: Alter graph structure.
○ Join Operators: Integrate external data with graph
elements.
● Application Areas:
○ Social network analysis.
○ Recommendations systems.
○ Fraud detection.
Property Operators

● Purpose: Transform vertex or edge properties without changing


the graph structure.
● Preserve structural indices for optimization.
● Initialize graphs for specific computations.
Property Operators
Structural Operators

● Purpose: Modify the structure of the graph.

Examples:
● Removing invalid vertices or edges.
● Simplifying multigraphs.
Structural Operators
Join Operators

● Purpose: Combine external data with graph elements.

Applications:
● Enriching graph data with external attributes.
● Integrating results from other computations.
Join Operators
joinVertices: Joins an RDD with vertices and modifies vertex properties.
val updatedGraph = graph.joinVertices(extraData)((id, oldAttr,
newAttr) => newAttr)

outerJoinVertices: Joins an RDD with vertices and allows for unmatched


vertices.
val updatedGraph = graph.outerJoinVertices(extraData)((id, attr,
opt) => opt.getOrElse(attr))
Advanced Graph Computations

Aggregate Messages:
● Collects and aggregates information from neighboring vertices.
Advanced Graph Computations

Pregel API:
● Iterative computation framework for graph processing.
Neighborhood Aggregation in GraphX
● Neighborhood aggregation is central to many graph analytics tasks.
● Examples include:
○ Counting followers.
○ Calculating average attributes (e.g., age of followers).
○ Iterative algorithms like PageRank, Shortest Path, and Connected
Components.
● Aggregation operators transition from mapReduceTriplets to
aggregateMessages for improved performance.
AggregateMessages Operator
● Purpose: Core operation for neighborhood aggregation.
● Definition:

Components:
1. sendMsg: Map function to send messages via EdgeContext.
2. mergeMsg: Reduce function to aggregate messages.
3. tripletFields: Optional argument to optimize join strategies by specifying accessed fields.
AggregateMessages Operator

Key Features
● Explicit user control over accessed fields (TripletFields).
● Returns VertexRDD[Msg] containing aggregated messages.
● Vertices without messages are excluded.
● Optimized for constant-sized messages.
Example: Average Age of Older Followers
val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc,
numVertices = 100).mapVertices((id, _) => id.toDouble)
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int,
Double)]( triplet => {
if (triplet.srcAttr > triplet.dstAttr) {
triplet.sendToDst((1, triplet.srcAttr)) }
}, (a, b) => (a._1 + b._1, a._2 + b._2) )
val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues
{ case (count, totalAge) => totalAge / count }
avgAgeOfOlderFollowers.collect.foreach(println)
Legacy Operator: mapReduceTriplets
Replaced by aggregateMessages due to:
● Inefficiency of iterator-based message aggregation.
● Limited optimization capabilities.
val graph: Graph[Int, Float] = ...

// mapReduceTriplets

def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = Iterator((triplet.dstId, "Hi"))

def reduceFun(a: String, b: String): String = a + " " + b

val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

// aggregateMessages

def msgFun(triplet: EdgeContext[Int, Float, String]) {

triplet.sendToDst("Hi")

def reduceFun(a: String, b: String): String = a + " " + b

val result = graph.aggregateMessages[String](msgFun, reduceFun)


Common Aggregation Tasks
1. Degree Information:
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
2. Collecting Neighbors:
val neighborIds: VertexRDD[Array[VertexId]] =
graph.collectNeighborIds(EdgeDirection.Out)
val neighbors: VertexRDD[Array[(VertexId, VD)]] =
graph.collectNeighbors(EdgeDirection.In)
Collecting Neighbors

In some cases it may be easier to express computation by collecting neighboring


vertices and their attributes at each vertex. This can be easily accomplished using
the collectNeighborIds and the collectNeighbors operators.
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection):
VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[
Array[(VertexId, VD)] ]
}
Distributed Graphs

GraphX adopts a vertex-cut approach to distributed graph partitioning:


Distributed Graphs
● GraphX partitions the graph along vertices which can reduce both the
communication and storage overhead.
● Logically, this corresponds to assigning edges to machines and
allowing vertices to span multiple machines.
● The exact method of assigning edges depends on the PartitionStrategy
and there are several tradeoffs to the various heuristics.
● Users can choose between different strategies by repartitioning the
graph with the Graph.partitionBy operator.
● The default partitioning strategy is to use the initial partitioning of the
edges as provided on graph construction.
Graph Algorithms
Page Rank Algorithm

● What is PageRank?
○ Measures the importance of each vertex in a graph based on the
edges (or connections).
○ Works on the premise that an edge from node u to node v
represents an endorsement of v’s importance by u.
○ Example: A Twitter user with many followers will have a higher
rank (importance).
Page Rank Algorithm

● Types of PageRank in GraphX:


○ Static PageRank: Runs for a fixed number of iterations.

○ Dynamic PageRank: Runs until the ranks converge (i.e., stop


changing by more than a specified tolerance).
Page Rank Algorithm

Using PageRank in GraphX

● How to use PageRank:

○ PageRank Methods: Available as methods on the PageRank


object.

○ GraphOps: Enables calling algorithms directly on the graph.


Page Rank Algorithm

● Example Dataset:

○ Social Network Example:

■ Users are listed in data/graphx/users.txt.

■ Relationships between users are in data/graphx/followers.txt.

○ Goal: Compute the PageRank for each user based on their


followers.
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1)) }
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
What are Connected Components?

● A connected component is a subgraph in which any two vertices are


connected by paths.

● Each connected component is labeled with the ID of its


lowest-numbered vertex.

● Example: In a social network, connected components can represent


clusters of users who are all connected directly or indirectly.
Connected Components in GraphX

● GraphX Implementation:
○ The algorithm is available in the ConnectedComponents object
in GraphX.
○ Labels each connected component in the graph with a unique ID
(the ID of the lowest-numbered vertex).

● How It Works:
○ The connected component algorithm runs on a graph and assigns
the same ID to all vertices in the same connected component.
○ Vertices with no edges are considered individual components.
Connected Components in GraphX

import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
/
Connected Components in GraphX

/ Join the connected components with the usernames


val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)}
// Print the result
println(ccByUsername.collect().mkString("\n"))
Triangle Counting in GraphX

● What is Triangle Counting?

○ A vertex is part of a triangle if it has two adjacent vertices with an


edge between them.

○ Triangle counting provides a measure of clustering by counting


how many triangles pass through a vertex.
Triangle Counting in GraphX

● GraphX Implementation:

○ The triangle counting algorithm is available in the TriangleCount


object in GraphX.

○ The algorithm counts the number of triangles that pass through


each vertex.
Triangle Counting in GraphX

● How It Works:

○ A triangle is formed when three vertices are connected, and each


pair of adjacent vertices has an edge between them.

○ Triangle counting is a measure of clustering in the graph,


indicating how connected the graph is in groups.
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle
count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt",
true) .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1)) }
val triCountByUsername = users.join(triCounts).map { case (id,
(username, tc)) => (username, tc)}
// Print the result
println(triCountByUsername.collect().mkString("\n"))

You might also like