ECS640U/ECS765P Big Data Processing
Large Scale Graph Processing
Lecturer: Ahmed M. A. Sayed
School of Electronic Engineering and Computer Science
ECS640U/ECS765P Big Data Processing
Large-Scale Graph Processing
Lecturer: Ahmed M. A. Sayed
School of Electronic Engineering and Computer Science
Credit: Joseph Doyle, Jesus Carrion, Felix Cuadrado, …
Weeks 6-11: Processing
Data
Ingestion Storage Processing Output
Sources
In this week, we will focus on Graph Processing
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● Graphx
Graph Definition
A graph G = (V,E), where
• V represents the set of vertices (nodes)
• E represents the set of edges (links)
• Both vertices and edges may contain additional information
Different types of graphs:
• Directed vs. undirected edges
• Cyclic vs Acyclic
• Temporal Graphs
Graphs are ubiquitous
Modeling and tracking interactions
Social Graphs
Social media defines interaction networks
• Contacts
• Messages
• Tags
Graph analysis is quite useful to obtain valuable information
• Identify leaders in a community
Measure of influence (centrality)
Identify “special” nodes and communities
• Find the right fitness Instagram influencer to advertise your protein
powder on
Community Detection
Community detection, also called graph partition
• Helps us to reveal the hidden relations among the nodes in the network.
• Many algorithms have been developed to detect communities
Communities of college football network, using colors for conferences and spatial clustering for identified communities
https://www.ese.wustl.edu/~nehorai/research/network_science/Lu_Community_Detection_SR_2018.html
Bipartite graphs
Bipartite: when the graph is partitioned into two groups and nodes only can have edges to the other part
https://en.wikipedia.org/wiki/Bipartite_graph
Example: “Stable marriage/matching” problem: how to find a stable matching between two equally sized
sets of elements given an ordering of preferences for each element. A matching is a bijection from the
elements of one set to the elements of the other set https://en.wikipedia.org/wiki/Stable_marriage_problem
Not Stable: if there is an element A of the first matched set which prefers some given element B of the
second matched set over the element to which A is already matched with, and similarly B also
prefers A over the element to which B is already matched with.
Practical applications: Web advertising, click prediction
Contagion/epidemic networks
How quickly will COVID-19 spread on this graph?
Contact tracing and analysis of epidemic spreading
“Needle exchange” networks of drug users [Weeks et al. 2002]
Interesting Properties – Power Law
The Power Law in the degree distribution (or popularity)
• The minority (only few number of nodes) has high degree of influence
• Also called Scale-Free Networks
• Quite common in many human/social networks
Central Nodes or Influencers with high degree
Scale-Free – degree distribution follows power law
Random – uniform degree distribution
Frequency
Degree
https://en.wikipedia.org/wiki/Scale-free_network
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● GraphX
Graph Management / Storage
Traditional DBs, NoSQL DBs can store graphs
But query languages do not support native queries on graph elements (checking for relations)
We need query languages/abstractions suitable for finding relationship patterns
Rise of graph DBs
• Neo4j : Graph database management system (Java-based, not distributed)
• Titan: Distributed Graph Database
• Amazon Neptune (Nov 2017) : Fully Managed Graph Database
Graph Databases
Database that uses graph structures with nodes, edges and properties to store data
Provides index-free adjacency
• Every node is a pointer to its adjacent element
• Fast for following relationships
Edges hold most of the important information and connect:
• nodes to other nodes
• nodes to properties/metadata (Resource Description Framework - RDF)
Neo4j
Java-based graph database management system
Similar to SQL: it is ACID – Atomic, Consistent, Isolated and Durable for logical units of work for database
transactions – (https://en.wikipedia.org/wiki/ACID)
Property graph model: powerful schema-less way to model graph-based information
Good performance for non-massive datasets
Not distributed – but sharded (partition)
Cypher - Graph-specific query language
• ASCII-art syntax for define and match patterns
The property graph model
Entities – Vertices and Edges
Tags – Entities have type(s)
Properties – Key value pairs attached to entities
The property graph model: Books
Tags
Property
Entity
Why SQL is not suitable for dealing with a graph-based data?
SQL: Modelling and Querying a Graph
Relationship graph between account holders
Imagine using such
cumbersome SQL to
query large social
network graphs!
Get non-immediate
friends of
Person001 who are
up to 3 hops away
Cypher query language
Query Language for Neo4j
• Becoming standard through OpenCypher initiative (https://opencypher.org)
Declarative and Expressive language
Match queries, returning all the graph elements who satisfy all the pattern
Sample Cypher query on a graph
*..5 => any number up to 5
Movies Database neo4j
Install neo4j: https://neo4j.com/docs/operations-manual/current/installation/
Download desktop edition: https://neo4j.com/download/
Open the Movies project in the desktop and then use command :play movies
Then follow the instructions for creating movies database and queries
Movies Database neo4j
Find Movies released in the 1990s
Find actors up to 4 hops away from Kevin Bacon
Find actors shortest path between two actors
Find co-co actors of Tom Hanks
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● GraphX
Break and Quiz
Neo4j Python Library
Install neo4j python library
Pip install neo4j
For example: https://github.com/neo4j-examples/movies-python-bolt
Can access neo4j database with python using the neo4j library
Neo4j Python Obtaining Json Graph
Obtain json of the graph defined in the movies database showing movie titles and their actors/cast
Neo4j Python Search Functionality
Search for movies in the database that has sub-text defined by the variable q in movie titles
Case Sensitive Matching Any character Zero or More times
https://neo4j.com/docs/cypher-manual/current/clauses/where/#query-where-regex
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/regex/Pattern.html
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● GraphX
Graph Traversal in MapReduce
Approach: Parallel processing of each vertex
● Each Map/Reduce function has access to limited info
One node and its links
Iterative executions of a MapReduce job
● Map: compute something on each node. Potentially send information to that node or other nodes that
is aggregated by the Reducers.
● Reducers: compute something on each unique node
● The output of the reducers in iteration #n becomes the input of the mappers in iteration #n+1
Finding the Shortest Path: Intuition
Breadth-First Search (BFS) algorithm (https://en.wikipedia.org/wiki/Breadth-first_search)
We can define the solution to this problem via induction:
● distanceTo(startNode) = 0
● For all nodes n directly reachable from startNode à distanceTo(n) = 1
● For all nodes n reachable from some other set of nodes S,
distanceTo(n) = 1 + min(distanceTo(m) for all m neighbors ∈ S)
Visualizing Parallel BFS
Inefficient
Need to keep track of
the list of visited nodes
and pass it over
between jobs along
with the updated graph
state/structure
MapReduce graph processing performance
Iterative algorithms involve HDFS writing in each step
● Resending the graph structure in each iteration is VERY inefficient
One Map task per node, and sending of messages to other nodes depending on connections between
graph nodes results in significant communications cost.
In-memory systems are a much better fit for this type of computation à Spark Framework
● Graph-specific in-memory systems have been developed recently
More Efficient Alternatives
Google’s Pregel
● Original Google paper
● Google’s Pregel model: Think like a vertex [1]
Apache Giraph
● Java-based
Apache Spark GraphX
● Extension of Spark with Graph-centric computation model (Scala)
● GraphFrames for Python API (used for this week’s lab)
… Ongoing research efforts in this space
[1] Malewicz, G., Austern, M. H., Bik, A. J., Dehnert, J. C., Horn, I., Leiser, N., & Czajkowski, G. (2010, June). ”Pregel: a system for large-scale graph
processing.”, In Proceedings of the ACM SIGMOD
Pregel: Think like a vertex
The Pregel framework allows you to write “vertex-centric” code.
The same user code, a compute() function, is run concurrently
on each vertex of the graph.
Each instance of this function
1. keeps track of information
2. can iterate over outgoing edges (each of which has a value)
3. can send messages to the vertices connected to those edges
or to any other vertices it may know about (e.g., having
received a vertex ID via a message) Bulk Synchronous Parallel (BSP)
https://people.cs.rutgers.edu/~pxk/417/notes/pregel.html
Pregel’s node/vertex-centric processing model
Pregel-style graph processing systems
Computation is iterative but in the form of supersteps
● Every iteration, a function that is executed at each vertex
Vertices can send messages to its neighbours
Messages arrive in the next superstep
Computation is executed in parallel
● Each vertex is independent from the rest in the same step
● Messages are the synchronization mechanism
https://people.cs.rutgers.edu/~pxk/417/notes/pregel.html
Google’s PageRank
PageRank is a link analysis algorithm
The rank value indicates the importance of a particular web page
A hyperlink to a page counts as a vote of support
A page that is linked to by many pages with high PageRank receives a high rank itself
Example: A PageRank of 0.5 means there is a 50% chance that a person clicking on a random link
will be directed to the document with a PageRank of 0.5
Page, L., Brin, S., Motwani, R., & Winograd, T. (1999). The PageRank citation ranking: bringing order to the web., WWW
PageRank Example
Rank of the neighbor
Initial value = 1 / N
(number of pages)
Outdegree of the neighbor
r1(P2) = r(P3) / d(P3) + r(P1) / d(P1) = (1/6)/3 + (1/6)/2 = 1/18 + 1/12 = 30 / 216 = 5 / 36
r2(P2) = r1(p3)/d(p3) + r1(P1)/d(P1) = (1/12)/3 + (1/18)/2 = 1/36+1/36 = 1/18
https://en.wikipedia.org/wiki/PageRank
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● GraphX
Spark GraphX
Spark’s library for graph processing
Provides specialized RDDs for representing graph structure, as well as its information (property graphs)
Provides methods for creating graph, transforming them, implementing multiple common graph metrics
and algorithms
GraphX is written in Scala à Graphframes is the Python library for using Spark’s Graph Processing
Spark GraphX Property Graphs
Spark GraphX RDD
Holds graph data and provides methods for manipulating them
VertexRDD[VertexId, VertexData]
Vertex IDs have to be Integer/Long
VertextData Holds vertex properties
EdgeRDD [EdgeData]
Edgedata holds source and destination IDs and edge properties
Technically a directed graph
Triplets
Join of source vertex, destination vertex, and edge
GraphX predefined methods
A Graph RDD has multiple convenience methods that provide access to its information and implement
relevant operations
● Access to RDDs with the property information
graph.vertices, graph.edges, graph.triplets
● Provides a tuple with (vertexId, degree of each vertex)
graph.degrees
● Obtains each of the connected components of the graph
graph.connectedComponents
GraphX predefined methods
_2 is second field in table -> Property column _2 is second field in Property column -> position of the person
graph.vertices.map(v => v._2._2).collect() //returns (student, postdoc, professor, professor)
graph.edges.filter ( e => e._3.equals("PI")).count() //returns 1
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all users that are postdocs
return 1
https://spark.apache.org/docs/latest/graphx-programming-guide.html
Graph aggregate computation
Aggregate transformations send and process messages to every vertex through each edge
graph.aggregateMessages: This operator applies a user defined sendMsg function to each edge triplet in
the graph and then uses the mergeMsg function to aggregate those messages at their destination vertex.
The operation involves the following:
● sendMsg: EdgeContext[VD, ED, Msg] => Unit
Can send messages to either source or destination, using context (Same as Map in MapReduce)
● mergeMsg: (Msg, Msg) => Msg
All the received messages by a vertex are reduced into one (Same as Reduce in MapReduce)
Returns a tuple of (vertexId, results)
https://spark.apache.org/docs/latest/graphx-programming-guide.html#aggregate-messages-aggregatemessages
Age of the oldest follower of each node
(Scala code)
val oldFollowers: VertexRDD[(Int, Double)] =
graph.aggregateMessages[(Int, Double)](
// sendMessages Max(23, 42) = 42
edge => edge.sendToDst(edge.srcAttr),
//mergeMessages
(a, b) => math.max (a,b)
75
)
http://webprojects.eecs.qmul.ac.uk/ag316/notesSite/BDP_slides/Week7%20%7C%20BigGraphs/ECS640-9-BigGraphs.pdf
Big Data Processing: Week 9
Topic List:
● Graph Applications
● Graph Databases
● Graph Databases with python
● Pregel
● GraphX
End and Quiz