Open MP programs, vector computation approaches, multithreading, MESI protocol, MPI, map/reduce
OpenMP is one of the most common parallel programming models in use today.
It is relatively easy to use which makes a great language to start with when learning to
write parallel software.
Aspect Concurrency Parallelism
DEFINITION A condition of a system in A condition of a system in
which multiple tasks are which multiple tasks are
logically active at one time. actually active at one time.
Application An application for which An application for which the
computations logically execute computations actually execute
simultaneously due to the simultaneously in order to
semantics of the application. complete a problem in less time.
Write a program that prints “hello world”.
int main()
{
int ID = 0;
printf(“ hello(%d) ”, ID);;
printf(“ world(%d) \n”, ID);
;
}
Write a multithreaded program that prints “hello world”.
#include <omp.h>
int main()
{
#pragma omp parallel
{
int ID = 0;
printf(“ hello(%d) ”, ID);;
printf(“ world(%d) \n”, ID);;
}
}
Write a multithreaded program where each thread prints “hello world”.
#include <omp.h>
int main()
{
#pragma omp parallel
{
int ID = omp_get_thread_num();
printf(" hello(%d) ", ID);
printf(" world(%d) \n", ID);
}
}
Thread Creation: Parallel Regions
#include <omp.h>
double A[1000];
#pragma omp parallel num_threads(4)
{
int ID = omp_get_thread_num();
pooh(ID, A);
}
printf("all done\n");
Parallel PI Program
#include <omp.h>
#include <stdio.h>
static long num_steps = 100000;
double step;
#define NUM_THREADS 2
void main() {
int i, nthreads;
double pi, sum[NUM_THREADS];
step = 1.0 / (double) num_steps;
omp_set_num_threads(NUM_THREADS);
#pragma omp parallel
{
int i, id, nthrds;
double x;
id = omp_get_thread_num();
nthrds = omp_get_num_threads();
if (id == 0) {
nthreads = nthrds;
}
for (i = id, sum[id] = 0.0; i < num_steps; i = i + nthrds) {
x = (i + 0.5) * step;
sum[id] += 4.0 / (1.0 + x * x);
}
}
for (i = 0, pi = 0.0; i < nthreads; i++) {
pi += sum[i] * step;
}
printf("Pi = %f\n", pi);
}
eliminate False sharing by padding the sum array
#include <omp.h>
#include <stdio.h>
static long num_steps = 100000;
double step;
#define PAD 8 // assume 64 byte L1 cache line size
#define NUM_THREADS 2
void main() {
int i, nthreads;
double pi, sum[NUM_THREADS][PAD];
step = 1.0 / (double) num_steps;
omp_set_num_threads(NUM_THREADS);
#pragma omp parallel
{
int i, id, nthrds;
double x;
id = omp_get_thread_num();
nthrds = omp_get_num_threads();
if (id == 0) {
nthreads = nthrds;
}
for (i = id, sum[id][0] = 0.0; i < num_steps; i = i + nthrds) {
x = (i + 0.5) * step;
sum[id][0] += 4.0 / (1.0 + x * x);
}
}
for (i = 0, pi = 0.0; i < nthreads; i++) {
pi += sum[i][0] * step;
}
printf("Pi = %f\n", pi);
}
Synchronization: bringing one or more threads to a well defined and known point in their
execution.
The two most common forms of synchronization are:
1. Barrier: each thread wait at the barrier until allthreads arrive.
2. Mutual exclusion: Define a block of code that only one thread at a time can execute.
Barrier:
#pragma omp parallel
{
int id = omp_get_thread_num();
A[id] = big_calc1(id);
#pragma omp barrier
B[id] = big_calc2(id, A);
}
Mutual exlusion:
float res = 0.0;
#pragma omp parallel
{
float B;
int i, id, nthrds;
id = omp_get_thread_num();
nthrds = omp_get_num_threads();
for (i = id; i < niters; i += nthrds) {
B = big_job(i);
#pragma omp critical
{
res += consume(B);
}
}
}
Using a critical section to remove impact of false sharing
#include <omp.h>
#include <stdio.h> // Include necessary headers
static long num_steps = 100000;
double step;
#define NUM_THREADS 2
void main() {
double pi;
step = 1.0 / (double) num_steps;
omp_set_num_threads(NUM_THREADS);
#pragma omp parallel
{
int i, id, nthrds;
double x;
id = omp_get_thread_num();
nthrds = omp_get_num_threads();
if (id == 0) {
nthreads = nthrds; // Assuming nthreads is declared somewhere
else
}
for (i = id, sum = 0.0; i < num_steps; i = i + nthreads) {
x = (i + 0.5) * step;
#pragma omp critical
{
pi += 4.0 / (1.0 + x * x);
}
}
}
pi *= step;
// Output the calculated value of pi
printf("Pi = %f\n", pi);
}
Pi with a loop and a reduction
#include <omp.h>
static long num_steps = 100000;
void main() {
int i;
double x, pi, sum = 0.0;
double step = 1.0 / (double) num_steps;
#pragma omp parallel
{
double x;
#pragma omp for reduction(+:sum)
for (i = 0; i < num_steps; i++) {
x = (i + 0.5) * step;
sum += 4.0 / (1.0 + x * x);
}
}
pi = step * sum;
printf("Pi = %f\n", pi);
}
MPI
Some key aspects and principles that can be attributed to "Foster's Design Methodology" in MPI
programming include:
1. Partitioning: Decompose the problem into smaller tasks that can be executed
independently or in parallel by multiple processes.
2. Communication: Design efficient communication patterns between MPI processes to
exchange data and synchronize computations when necessary.
3. Agglomeration: Group related tasks together to minimize communication overhead and
improve computational efficiency.
4. Mapping: Map tasks and data onto the available processors or nodes in a way that
balances workload and minimizes idle time.
Domain Decomposition:
1. Purpose: Organizes a system into modules based on logical or functional domains.
2. Benefits: Promotes modularity, separation of concerns, and code reuse.
3. Application: Useful in large systems to manage complexity, support parallel
development, and ensure scalability.
Functional Decomposition:
1. Purpose: Breaks down system functionality into smaller, manageable tasks.
2. Benefits: Simplifies development, testing, and debugging by focusing on specific
functions or operations.
3. Application: Applied at different levels of system design to manage complexity, handle
dependencies, and improve integration.
In MPI (Message Passing Interface), the primary means of communication and coordination
between processes (or tasks) are through various MPI function calls. These function calls
facilitate data exchange, synchronization, and coordination among processes running
concurrently. Here's a summary of some essential MPI function calls commonly used in MPI
programs:
1. MPI_Init(&argc, &argv):
o Initializes the MPI execution environment.
o Parameters argc and argv are arguments typically passed to main() in C/C++
programs.
2. MPI_Finalize():
o Finalizes the MPI execution environment.
o Cleans up all MPI resources before program termination.
Process Information
3. MPI_Comm_size(MPI_COMM_WORLD, &num_procs):
o Retrieves the total number of processes in the communicator MPI_COMM_WORLD.
int num_procs;
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
4. MPI_Comm_rank(MPI_COMM_WORLD, &my_rank):
o Retrieves the rank of the calling process within the communicator
MPI_COMM_WORLD.
int my_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
5. MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm):
Reduces values on all processes to a single value on the root process using the operation
specified by op.
MPI_Reduce(&local_result, &global_result, 1, MPI_DOUBLE, MPI_SUM, 0,
MPI_COMM_WORLD);
6. MPI_Barrier(MPI_Comm comm):
Blocks processes until all processes in the communicator comm have reached this point.
MPI_Barrier(MPI_COMM_WORLD);
Here's a simple example of how these MPI function calls might be used together in a program:
#include <stdio.h>
#include <mpi.h>
int main(int argc, char *argv[]) {
int num_procs, my_rank;
int my_data = 42, recv_data;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
if (my_rank == 0) {
MPI_Send(&my_data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (my_rank == 1) {
MPI_Recv(&recv_data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("Process %d received data: %d\n", my_rank, recv_data);
}
MPI_Finalize();
return 0;
}
MULTITHREADING
A thread is an independent unit of execution.
A thread is also called a light-weight process
A program may contain multiple threads
Each of the thread may have local variables In Java, the Runnable
interface and Thread class of package java.lang are used for
implementation of thread
To implement a thread, the desired class must implement the
Runnable interface and provide the run() method.
public class MyThread implements Runnable {
public void run() {
//implementation of thread
}}
The Thread class can then be used to start a thread as follows:
public class TestThread
{
public static void main( String[] args )
{
MyThread m = new MyThread();
Thread t = new Thread(m);
m.start();
}
}
Write a class that implements Runnable.
Define a constructor that takes the name of the thread as argument.
The thread upon execution will print the name of the thread in a while
loop.
Define and run 5 thread objects. What output do you see?
In above task, modify the run method to randomly sleep the thread for
few milliseconds. Observe the output.
import java.util.Random;
class MyRunnable implements Runnable {
private String threadName;
private Random random = new Random();
public MyRunnable(String threadName) {
this.threadName = threadName;
}
@Override
public void run() {
while (true) {
System.out.println(threadName);
try {
Thread.sleep(random.nextInt(100)); // Random sleep
between 0 and 99 milliseconds
} catch (InterruptedException e) {
System.out.println(threadName + " interrupted");
}
}
}
public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
Thread thread = new Thread(new MyRunnable("Thread-" + i));
thread.start();
}
}
}
Unsynchronized
public class UnsynchronizedExample {
public static void main(String[] args) {
new PrintStringsThread("Hello ", "there.");
new PrintStringsThread("How are ", "you?");
new PrintStringsThread("Thank you ", "very much!");
}
}
public class PrintStringsThread implements Runnable {
Thread thread;
String str1, str2;
PrintStringsThread(String str1, String str2) {
this.str1 = str1;
this.str2 = str2;
thread = new Thread(this);
thread.start();
}
public void run() {
TwoStrings.print(str1, str2);
}
}
public class TwoStrings {
// This method is not synchronized
static void print(String str1, String str2) {
System.out.print(str1);
Thread.sleep(500);
System.out.println(str2);
}
}
SYNCHRONIZED( just add SYNCHRONIZED keyword )
public class TwoStrings {
// This method is synchronized
static synchronized void print(String str1, String str2) {
System.out.print(str1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("Thread interrupted.");
}
System.out.println(str2);
}
}
MESI PROTOCOL
For MESI, the data cache includes two status bits per tag, so that each line
can be in one of four states:
Modified: The line in the cache has been modified (different from main
memory) and is available only in this cache.
Exclusive: The line in the cache is the same as that in main memory
and is not present in any other cache.
Shared: The line in the cache is the same as that in main memory and
may be present in another cache.
Invalid: The line in the cache does not contain valid data.
1. Read Miss
When a processor tries to read data that is not in its cache (read miss):
Step 1: The processor initiates a memory read to fetch the data.
Step 2: The processor sends a signal on the bus to notify other
caches.
Possible Scenarios:
1. Another Cache has the Data in Exclusive State:
o The cache with the exclusive copy signals it shares the data.
o This cache changes its state to shared.
o The initiating processor reads from main memory and sets its
state to shared.
2. Other Caches have the Data in Shared State:
o All caches with the shared copy signal they share the data.
o The initiating processor reads from main memory and sets its
state to shared.
3. Another Cache has the Data in Modified State:
o The cache with the modified copy blocks the memory read
and sends the data over the bus.
o This cache changes its state to shared.
o The memory controller updates the main memory with this
data.
o The initiating processor receives the data and sets its state to
shared.
4. No Cache has the Data:
o No signals are returned.
o The initiating processor reads from main memory and sets its
state to exclusive.
2. Read Hit
When a processor reads data that is already in its cache (read hit):
The processor reads the data.
No state change occurs.
3. Write Miss
When a processor tries to write data that is not in its cache (write miss):
Step 1: The processor initiates a memory read with intent to
modify (RWITM).
Step 2: The processor sends a signal on the bus to notify other
caches.
Possible Scenarios:
1. Another Cache has the Data in Modified State:
o The cache with the modified copy signals this.
o The initiating processor waits.
o The other cache writes the modified data back to memory
and invalidates its copy.
o The initiating processor reads from main memory, modifies
the data, and sets its state to modified.
2. No Cache has the Data in Modified State:
o No signals are returned.
o The initiating processor reads from main memory and
modifies the data.
o If other caches have the data in shared or exclusive state, they
invalidate their copies.
4. Write Hit
When a processor writes data that is already in its cache (write hit):
Shared State:
o The processor signals its intent to gain exclusive ownership.
o Other caches invalidate their copies.
o The processor updates the data and sets its state to modified.
Exclusive State:
o The processor has exclusive control, so it updates the data
and sets its state to modified.
Modified State:
o The processor already has exclusive control and modified
state, so it simply updates the data.
L1-L2 cache consistency
Write-Through Policy in L1 Cache :
When data is written to the L1 cache, it is also written to the L2 cache.
This ensures that the L1 cache is always a subset of the L2 cache.
Any update to the L1 cache is immediately reflected in the L2 cache.
This makes the cache coherence protocol simpler.
Consistency: L1 and L2 caches are always in sync.
Increased Latency and Traffic: Writing to both caches can be slower
and cause more data traffic.
Write-Back Policy in L1 Cache
Data is written only to the L1 cache.
Data is written to the L2 cache only when it is removed from the L1
cache.
L1 cache may have newer data than the L2 cache.
The cache coherence protocol needs to handle this complexity.
Better Performance: Faster writes to L1 and less traffic between L1
and L2.
Complex Protocol: L2 might have outdated data, requiring more
complex management.
MAP REDUCE
MapReduce is a programming model and processing technique for
distributed computing based on two key functions: Map and Reduce. It
is designed to process large-scale datasets in parallel, distributing the
computation across a cluster of computers.
-WHY MAP REDUCE
Traditional model is certainly not suitable to process huge volumes of
scalable data and cannot be accommodated by standard database servers.
Moreover, the centralized system creates too much of a bottleneck while
processing multiple files simultaneously.
-How MapReduce Works
Map Task:
1. Input Data: The process begins with the input data, which is
typically stored in a distributed file system.
2. Mapping Function:
o The Map task takes a set of data and converts it into another
set of data.
o Individual elements are broken down into tuples (key-value
pairs).
o Example: For a word count program, the Mapper reads lines
of text and outputs (word, 1) pairs for each word.
Reduce Task:
1. Input from Map Task: The Reduce task takes the output from the
Map task as input.
2. Combining Tuples:
o The Reduce task combines these data tuples (key-value pairs)
into a smaller set of tuples.
o It processes the grouped key-value pairs to produce the final
output.
o Example: For a word count program, the Reducer sums the
values for each key (word) and outputs (word, total_count)
pairs.
1. Map Phase: Each chunk of input data is processed by a Mapper
function.
2. Shuffle and Sort: The framework collects and sorts the key-value
pairs from the Map phase.
3. Reduce Phase: The grouped key-value pairs are processed by the
Reducer function.
4. Output: The final output from the Reducer is written to the
distributed file system.
The Map task transforms the input data into key-value pairs.
The Reduce task aggregates the results from the Map task.
MapReduce Workflow for Tweet Tokenization and Analysis
Overview
1. Tokenize: Tokenizes the tweets into maps of tokens and writes
them as key-value pairs.
2. Filter: Filters unwanted words from the maps of tokens and writes
the filtered maps as key-value pairs.
3. Count: Generates a token counter per word.
4. Aggregate Counters: Prepares an aggregate of similar counter
values into small manageable units.
Key Tasks in MapReduce
1. Map Task: Handled by the Mapper class, which takes the input,
tokenizes it, and maps and sorts the data.
2. Reduce Task: Handled by the Reducer class, which takes the
output of the Mapper class, searches for matching pairs, and
reduces them.
VECTOR COMPUTATION APPROACHES
Vector Processing
Overview: Vector processing focuses on performing operations on
arrays or vectors of data in a systematic and efficient manner. This
approach assumes that operations can be applied simultaneously across
elements of a vector, leveraging SIMD (Single Instruction, Multiple
Data) architectures.
Characteristics:
SIMD Architecture: Executes a single instruction across multiple
data elements simultaneously.
Vector Registers: Specialized registers capable of holding and
manipulating multiple data elements at once.
Optimized for Arrays: Designed to handle arithmetic operations
efficiently on large arrays or vectors of floating-point numbers.
Example: Vector processors in supercomputers are optimized to execute
computations on vectors with high throughput, typically achieving high
levels of performance in numerical calculations.
Parallel Processing
Overview: Parallel processing involves distributing computational tasks
across multiple independent processors to enhance overall performance
and efficiency. This approach assumes tasks can be divided into smaller
units that can execute concurrently.
Characteristics:
Multiple Processors: Utilizes N independent processors (cores or
nodes) that work simultaneously on different parts of the
computation.
Task Partitioning: Divides the workload into smaller tasks that
are assigned to different processors.
Communication: Requires mechanisms for synchronization and
communication between processors to ensure coherence and
completion of tasks.
Example: In supercomputers, parallel processing techniques such as
MPI (Message Passing Interface) or OpenMP (Open Multi-Processing)
are used to distribute computations across nodes in a cluster, allowing
for large-scale simulations and data-intensive calculations.
Approaches
1. Pipelined ALU (Arithmetic Logic Unit)
Concept:
Purpose: Breaks down complex floating-point operations into
stages (compare, shift, add, normalize) that operate concurrently
on different sets of data.
Execution: Sequentially processes a stream of data from sequential
memory locations.
Speedup: Achieves time savings when fed with sequential data
streams, enhancing performance by keeping the pipeline full.
2. Parallel ALUs (Arithmetic Logic Units)
Concept:
Design: Uses multiple ALUs within a single processor under the
control of a single control unit.
Operation: Data is routed to ALUs concurrently, enabling parallel
processing of vector elements.
Enhancement: Pipelining can be applied to each ALU, improving
efficiency for vector operations.
3. Parallel Processors
Concept:
Structure: Involves multiple processors working in parallel to
execute tasks.
Execution: Requires breaking tasks into smaller processes that can
be executed concurrently across multiple processors.
Complexity: Effective coordination between hardware and
software is crucial for optimal performance.