Skip to content

Instantly share code, notes, and snippets.

@tinesubic
Created September 5, 2016 12:15
Show Gist options
  • Save tinesubic/67c11cce6b226de043e3074fc7086c4f to your computer and use it in GitHub Desktop.
Save tinesubic/67c11cce6b226de043e3074fc7086c4f to your computer and use it in GitHub Desktop.
PS Final Parallelization Code
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<math.h>
#include<time.h>
#include<sys/time.h>
#include <limits.h>
#include <mpi.h>
#define PI 3.14159265358979323846
#define MAX_ANGLE 85.0
typedef struct {
double *x;
double *y;
long N;
} lines;
double getTime() {
struct timeval mytime;
gettimeofday(&mytime, (struct timezone *) 0);
return mytime.tv_sec + mytime.tv_usec * 1.0e-6;
}
unsigned int rand32();
lines generateData(long N, double p, double maxAngle);
int *initDispls(long num, int sectors) {
int *displs = (int *) malloc(sectors * sizeof(int));
for (int i = 0; i < sectors; ++i)
displs[i] = (int) (num / sectors) * i;
return displs;
}
int *initSendCounts(long num, int sectors) {
int *counts = (int *) malloc(sectors * sizeof(int));
//displs = (int *) malloc(sectors * sizeof(int));
for (int i = 0; i < sectors; ++i) {
counts[i] = (int) num / sectors;
}
return counts;
}
int main(int argc, char *argv[]) {
long counter = 0;
double heightCounter = 0;
long zeroIndex = 0;
int size = atoi(argv[1]);
int my_rank; // rank (oznaka) procesa
int num_of_processes; // število procesov
MPI_Status status; // status sprejema
lines data;
MPI_Init(&argc, &argv); // inicializacija MPI okolja
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); // poizvedba po ranku procesa
MPI_Comm_size(MPI_COMM_WORLD, &num_of_processes); // poizvedba po številu procesov
double p = 0.01;
long N = (long) (size * pow((float) 10, 6));
long sectionSize = N / num_of_processes;
double currK = 0;
double prevK = 0;
double *xRecv = (double *) malloc(sectionSize * sizeof(double));
double *yRecv = (double *) malloc(sectionSize * sizeof(double));
int *indexes = (int *) malloc(sectionSize * sizeof(int));
long prevIndex;
double *xInit = NULL;
double *yInit = NULL;
int *sendcounts = NULL;
int *displs = NULL;
int *finalIndexes = NULL;
double *k = NULL;
memset(indexes, 0, sectionSize);
fflush(stdout);
double time1 = 0;
sendcounts = initSendCounts(N, num_of_processes);
displs = initDispls(N, num_of_processes);
if (my_rank == 0) {
fflush(stdout);
srand(3);
printf("Generating data: %zu MB\n", N * 2 * sizeof(float) / 1024 / 1024);
if (N > INT_MAX) {
printf("WARNING!! N Over INT_MAX");
}
data = generateData(N, p, MAX_ANGLE);
xInit = data.x;
yInit = data.y;
printf("Number of lines: %d*10^6\n", size);
printf("Threads: %d, Elements/thread: %li\n", num_of_processes, N / num_of_processes);
finalIndexes = (int *) malloc(N * sizeof(int));
k = (double *) malloc(N * sizeof(double));
time1 = getTime();
}
MPI_Scatterv(xInit, sendcounts, displs, MPI_DOUBLE, xRecv, sectionSize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Scatterv(yInit, sendcounts, displs, MPI_DOUBLE, yRecv, sectionSize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (my_rank == 0) {
k[0] = yRecv[0] / xRecv[0];
heightCounter = yInit[0];
zeroIndex = 0;
for (int i = 1; i < sectionSize; ++i) {
if (yRecv[i] > yRecv[zeroIndex]) {
k[i] = yInit[i] / xInit[i];
if (k[zeroIndex] > k[i])
k[i] = k[zeroIndex];
if (k[i] > k[zeroIndex]) {
counter++;
heightCounter += yInit[i] - k[zeroIndex] * xInit[i];
}
zeroIndex = i;
}
}
} else {
prevK = yRecv[0] / xRecv[0];
prevIndex = 0;
for (int i = 1; i < sectionSize; ++i) {
if (yRecv[i] > yRecv[prevIndex]) {
currK = (prevK > yRecv[i] / xRecv[i]) ? prevK : yRecv[i] / xRecv[i];
if (currK > prevK) {
indexes[i] = 1;
prevIndex = i;
prevK = currK;
}
}
}
}
MPI_Gatherv(indexes, sectionSize, MPI_INT, finalIndexes, sendcounts, displs, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
if (my_rank == 0) {
for (long i = sectionSize; i < N; ++i) {
if (finalIndexes[i] == 1) {
if (yInit[i] > yInit[zeroIndex]) {
k[i] = yInit[i] / xInit[i];
if (k[zeroIndex] > k[i])
k[i] = k[zeroIndex];
if (k[i] > k[zeroIndex]) {
counter++;
heightCounter += yInit[i] - k[zeroIndex] * xInit[i];
}
zeroIndex = i;
}
}
}
printf("Visible lines: %li\n", counter);
printf("Visible height: %.6f\n", heightCounter);
printf("Elapsed: %.6f\n", getTime() - time1);
free(k);
fflush(stdout);
}
free(displs);
free(sendcounts);
free(xRecv);
free(yRecv);
free(indexes);
MPI_Finalize();
return 0;
}
unsigned int rand32() {
unsigned int r = (rand() & 0x7fff) | ((rand() & 0x7fff) << 15) | ((rand() & 0x3) << 30);
return r;
}
lines generateData(long N, double p, double maxAngle) {
lines data;
data.N = N;
data.y = (double *) malloc(N * sizeof(double));
data.x = (double *) malloc(N * sizeof(double));
double maxk = tan(maxAngle / 180 * PI);
double num_visible = round(N * p);
double kstep = maxk / num_visible;
double xstep = 1000.0 / N;
for (long i = 0; i < N; i++)
data.x[i] = xstep + i * xstep;
double k = kstep;
for (long long i = 0; i < N; i += 1) {
if ((1.0 * rand32() / UINT_MAX) < p) {
//section of a line visible
data.y[i] = data.x[i] * k;
k += kstep;
} else {
data.y[i] = data.x[i] * ((1.0 * rand32() / UINT_MAX) * (k - kstep));
}
}
return data;
}
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<math.h>
#include<time.h>
#include <omp.h>
#include<sys/time.h>
#include <limits.h>
#define PI 3.14159265358979323846
#define DEBUG 1
#define THREADS 32
#define MAX_ANGLE 85.0
typedef struct {
double *x;
double *y;
long N;
} lines;
void runTests(int N);
unsigned int rand32();
lines generateData(long N, double p, double maxAngle);
double solveParallel(double *x, double *y, long N);
double solveSerial(double *x, double *y, long N);
double roundToN(double x, unsigned int digits);
int main(int argc, char *argv[]) {
srand(time(NULL));
runTests(atoi(argv[1]));
return 0;
}
/*
* Runs algorithm and compares results.
*/
void runTests(int size) {
double p = 0.01;
long N = (long) (size * pow((float) 10, 6));
//Size of data in MB
printf("Generating data: %zu MB\n", N * 2 * sizeof(float) / 1024 / 1024);
lines data = generateData(N, p, MAX_ANGLE);
printf("Number of lines: %d*10^6\n", size);
double resParallel = solveParallel(data.x, data.y, data.N);
double resSerial = solveSerial(data.x, data.y, data.N);
if (roundToN(resParallel, 6) == roundToN(resSerial, 6))
printf("SUCCESS!\n");
else
printf("FAIL: %.6f\n", resSerial - resParallel);
free(data.x);
free(data.y);
}
/*
* Rounds double to Nth decimal digit
*/
double roundToN(double x, unsigned int digits) {
return digits > 0 ? roundToN(x * 10.0, digits - 1) / 10.0 : round(x);
}
/*
* Returns current time as double
*/
double getTime() {
struct timeval mytime;
gettimeofday(&mytime, (struct timezone *) 0);
return mytime.tv_sec + mytime.tv_usec * 1.0e-6;
}
/*
* Runs a serial algorithm and return the result - visible height
*/
double solveSerial(double *x, double *y, long N) {
printf("----SERIAL----\n");
fflush(stdout);
int counter = 0;
double heightCounter = 0;
double *k = (double *) malloc(N * sizeof(double));
k[0] = y[0] / x[0];
double visibleHeight = y[0];
double timeStart = getTime();
for (long i = 1; i < N; ++i) {
k[i] = (k[i - 1] > y[i] / x[i]) ? k[i - 1] : y[i] / x[i];
if (k[i] > k[i - 1]) {
counter++;
heightCounter += y[i] - k[i - 1] * x[i];
}
}
if (DEBUG) {
printf("Visible lines: %d\n", counter);
printf("Visible height: %.6f\n", heightCounter);
}
printf("Elapsed: %.6f\n", getTime() - timeStart);
free(k);
return visibleHeight;
}
/*
* Runs the parallel implementation and returns the result
*/
double solveParallel(double *x, double *y, long N) {
//int maxThreads = omp_get_max_threads();
int maxThreads = THREADS;
omp_set_num_threads(maxThreads);
printf("---PARALLEL---\n");
printf("Threads: %d, Elements/thread: %li\n", maxThreads, N / maxThreads);
int counter = 0;
double heightCounter = 0;
double *k = (double *) malloc(N * sizeof(double));
int *indexes = (int *) malloc(N * sizeof(int));
long *prevIndexes = (long *) malloc(maxThreads * sizeof(long));
memset(indexes, 0, N);
memset(prevIndexes, 0, maxThreads);
fflush(stdout);
double visibleHeight = y[0];
heightCounter = visibleHeight;
long section = N / maxThreads;
double timeStart = getTime();
long prevIndex = 0;
/*
* Each thread receives its own section for calculation.
* Start values get initialised.
* Thread 0:
* Loop skips lines shorter than previous highest. For suitable lines, coefficient is calculated. If line is
* visible, counter and height rise and index for previous highest lines is updated.
* Other threads:
* Loop skips lines shorter than previous highest. For suitable lines, coefficient is calculated. If line is
* visible, index list is updated and previous highest line index variable is updated for this thread.
*
* Upon exiting parallel section, another loop recalculates values from end of section 0 forward. Shorter lines are
* skipped again, visible lines get added to height and line counter. This produces the final result.
*/
#pragma omp parallel
{
int threadNum = omp_get_thread_num();
long start = threadNum * section;
long end = (threadNum + 1) * section;
k[start] = y[start] / x[start];
prevIndexes[threadNum] = start;
if (threadNum == 0) {
for (long i = start + 1; i < end; ++i) {
if (y[i] > y[prevIndex]) {
k[i] = (k[prevIndex] > y[i] / x[i]) ? k[prevIndex] : y[i] / x[i];
if (k[i] > k[prevIndex]) {
counter++;
heightCounter += y[i] - k[prevIndex] * x[i];
prevIndex = i;
}
}
}
} else {
for (long i = start + 1; i < end; ++i) {
if (y[i] > y[prevIndexes[threadNum]]) {
k[i] = (k[prevIndexes[threadNum]] > y[i] / x[i]) ? k[prevIndexes[threadNum]] : y[i] / x[i];
if (k[i] > k[prevIndexes[threadNum]]) {
indexes[i] = 1;
prevIndexes[threadNum] = i;
}
}
}
}
}
for (long i = N / maxThreads; i < N; ++i) {
if (indexes[i] == 1) {
if (y[i] > y[prevIndex]) {
k[i] = (k[prevIndex] > y[i] / x[i]) ? k[prevIndex] : y[i] / x[i];
if (k[i] > k[prevIndex]) {
counter++;
heightCounter += y[i] - k[prevIndex] * x[i];
prevIndex = i;
}
}
}
}
if (DEBUG) {
printf("Visible lines: %d\n", counter);
printf("Visible height: %.6f\n", heightCounter);
}
printf("Elapsed: %.6f\n", getTime() - timeStart);
free(indexes);
free(prevIndexes);
free(k);
return visibleHeight;
}
/*
* Generates a random 32 bit number
*/
unsigned int rand32() {
unsigned int r = (rand() & 0x7fff) | ((rand() & 0x7fff) << 15) | ((rand() & 0x3) << 30);
return r;
}
/*
* Random data generation.
*/
lines generateData(long N, double p, double maxAngle) {
//Allocate memory for the data
lines data;
data.N = N;
data.y = (double *) malloc(N * sizeof(double));
data.x = (double *) malloc(N * sizeof(double));
//Compute the maximum LOS coefficient
double maxk = tan(maxAngle / 180 * PI);
//Compute the approximate number of visible line sections
double num_visible = round(N * p);
//coefficient step
double kstep = maxk / num_visible;
//min x-coordinate step
double xstep = 1000.0 / N;
//generate ascending x-coordinates of lines
for (long i = 0; i < N; i++)
data.x[i] = xstep + i * xstep;
//generate heights of lines according to the probability of a visible line p
double k = kstep;
for (long long i = 0; i < N; i += 1) {
if ((1.0 * rand32() / UINT_MAX) < p) {
//section of a line visible
data.y[i] = data.x[i] * k;
k += kstep;
} else
//line not visible
data.y[i] = data.x[i] * ((1.0 * rand32() / UINT_MAX) * (k - kstep));
}
return data;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment