Skip to content

Instantly share code, notes, and snippets.

@karthick18
Created May 2, 2011 08:16

Revisions

  1. karthick18 revised this gist Aug 8, 2011. 2 changed files with 48 additions and 10 deletions.
    10 changes: 8 additions & 2 deletions Makefile
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,12 @@
    CC := gcc
    CFLAGS := -Wall -g -std=c99
    LD_LIBS := -lpthread -lrt
    LD_LIBS := -lpthread
    ARCH := $(shell uname)
    BLOCK_SIZE := 1m
    ifeq ("$(strip $(ARCH))", "Linux")
    LD_LIBS += -lrt
    BLOCK_SIZE := 1M
    endif
    SRCS := prsort.c
    OBJS := $(SRCS:%.c=%.o)
    TARGET := prsort
    @@ -16,7 +22,7 @@ run: create_file
    ./$(TARGET) $(INPUT_FILE) $(BUFFER_SIZE)

    create_file: $(INPUT_FILE)
    dd if=/dev/urandom of=$< bs=1M count=$(SIZE)
    dd if=/dev/urandom of=$< bs=$(BLOCK_SIZE) count=$(SIZE)

    prsort: $(OBJS)
    $(CC) -o $@ $^ $(LD_LIBS)
    48 changes: 40 additions & 8 deletions prsort.c
    Original file line number Diff line number Diff line change
    @@ -21,8 +21,15 @@
    #include <assert.h>
    #include <pthread.h>
    #include <time.h>
    #include <sys/time.h>
    #include <sched.h>

    #ifdef __linux__
    #define offset_t loff_t
    #else
    #define offset_t off_t
    #endif

    struct sort_run
    {
    void *items;
    @@ -32,17 +39,26 @@ struct sort_run
    int (*cmp)(const void *, const void *);
    int input_fd;
    int output_fd;
    loff_t offset;
    offset_t offset;
    char input_file[40];
    char output_file[40];
    };

    #ifdef CLOCK_MONOTONIC
    static __inline__ void __time(unsigned long long *start)
    {
    struct timespec t;
    clock_gettime(CLOCK_MONOTONIC, &t);
    *start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_nsec/1000;
    }
    #else
    static __inline__ void __time(unsigned long long *start)
    {
    struct timeval t;
    gettimeofday(&t, NULL);
    *start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_usec;
    }
    #endif

    static __inline__ void __swap(char *a, char *b, int s)
    {
    @@ -146,8 +162,8 @@ static void *parallel_sort(void *arg)
    struct sort_run *run = arg;
    int num_input = run->length/run->size;
    int err = __qsort(run->items, num_input, run->size, run->cmp);
    printf("Sorting done for run [%ld] with [%d] items [%s]\n",
    run - g_runs, run->length/run->size, err ? "Unsuccessfully" : "Successfully");
    printf("Sorting done for run [%d] with [%d] items [%s]\n",
    (int)(run - g_runs), (int) run->length/run->size, err ? "Unsuccessfully" : "Successfully");
    return (void *)(unsigned long)err;
    }

    @@ -366,7 +382,7 @@ static int check_libc_qsort(int fd, size_t size)
    {
    struct sort_run output_run = {0};
    char *buf = calloc(1, size);
    loff_t offset = 0;
    offset_t offset = 0;
    assert(buf);
    int bytes = 0;
    lseek(fd, 0, SEEK_SET);
    @@ -387,12 +403,19 @@ static int check_libc_qsort(int fd, size_t size)
    return 0;
    }

    #ifdef __linux__
    static int get_cpu_count(void)
    {
    cpu_set_t set;
    sched_getaffinity(0, sizeof(set), &set);
    return CPU_COUNT(&set);
    }
    #else
    static int get_cpu_count(void)
    {
    return 4;
    }
    #endif

    int main(int argc, char **argv)
    {
    @@ -492,14 +515,23 @@ int main(int argc, char **argv)
    output_run = merge_runs(runs, i, &output_run, NULL);
    }
    __time(&end);
    printf("Time taken to sort [%ld] items = [%lld] usecs\n", statbuf.st_size/sizeof(int), end-start);
    printf("Time taken to sort [%ld] items = [%lld] usecs\n", (long int) (statbuf.st_size/sizeof(int)), end-start);
    if(output_run == runs) runs = NULL;

    out:
    for(i = 0; i < num_threads; ++i)
    if(runs)
    {
    if(runs[i].items) free(runs[i].items);
    for(i = 0; i < num_threads; ++i)
    {
    if(runs[i].items)
    {
    free(runs[i].items);
    runs[i].items = NULL;
    }
    }
    free(runs);
    }
    free(runs);

    if(output_run)
    {
    if(output_run->items)
  2. karthick18 revised this gist May 9, 2011. 2 changed files with 71 additions and 58 deletions.
    28 changes: 28 additions & 0 deletions Makefile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,28 @@
    CC := gcc
    CFLAGS := -Wall -g -std=c99
    LD_LIBS := -lpthread -lrt
    SRCS := prsort.c
    OBJS := $(SRCS:%.c=%.o)
    TARGET := prsort
    SIZE := 4
    INPUT_FILE := random.dat
    BUFFER_SIZE := 1024

    all: $(TARGET)

    .PHONY: $(INPUT_FILE) create_file run

    run: create_file
    ./$(TARGET) $(INPUT_FILE) $(BUFFER_SIZE)

    create_file: $(INPUT_FILE)
    dd if=/dev/urandom of=$< bs=1M count=$(SIZE)

    prsort: $(OBJS)
    $(CC) -o $@ $^ $(LD_LIBS)

    %.o: %.c
    $(CC) -c $(CFLAGS) -o $@ $<

    clean:
    rm -f $(OBJS) $(TARGET) *~
    101 changes: 43 additions & 58 deletions prsort.c
    Original file line number Diff line number Diff line change
    @@ -32,6 +32,7 @@ struct sort_run
    int (*cmp)(const void *, const void *);
    int input_fd;
    int output_fd;
    loff_t offset;
    char input_file[40];
    char output_file[40];
    };
    @@ -198,28 +199,45 @@ static int verify_sort_file(struct sort_run *run)
    return 0;
    }

    static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    static struct sort_run *merge_runs(struct sort_run *runs, int num_runs,
    struct sort_run **ret_output_run, int *total_runs)
    {
    struct sort_run *output_run;
    struct sort_run *runs[2] = { run1, run2 };
    struct sort_run *output_run = *ret_output_run;
    int run_item_size,buf_size;
    int (*run_cmp)(const void *,const void *) = NULL;
    int cur_runs = 2, num_runs = 2;
    int run_index_map[2] = {0};
    int cur_output_index = 0;
    int i;
    int cur_runs = num_runs + (output_run ? 1 : 0);
    int run_index_map[cur_runs];
    struct sort_run *run_array[cur_runs];
    int cur_output_index = 0, cur_run_index = 0;
    static char input_file[] = "foo";
    static char output_file[] = "bar";
    char *output_buffer = NULL;
    int i;
    if(!run1) return run2;
    assert(run1->size == run2->size);
    assert(run1->cmp == run2->cmp);
    run_cmp = run1->cmp;
    run_item_size = run1->size;
    buf_size = run1->buf_size;
    if(run1->output_fd < 0)

    if(!output_run && num_runs <= 1) return runs;
    if(!runs) return output_run;

    run_cmp= runs->cmp;
    run_item_size = runs->size;
    buf_size = runs->buf_size;

    if(output_run)
    {
    output_run = calloc(1, sizeof(*output_run));
    run_index_map[cur_run_index] = 0;
    run_array[cur_run_index++] = output_run;
    }
    for(i = cur_run_index; i < num_runs + cur_run_index; ++i)
    {
    run_array[i] = runs + (i - cur_run_index);
    run_index_map[i] = 0;
    }

    assert(i == cur_runs);
    num_runs = cur_runs;

    if(!output_run)
    {
    *ret_output_run = output_run = calloc(1, sizeof(*output_run));
    assert(output_run != NULL);
    output_run->size = run_item_size;
    output_run->cmp = run_cmp;
    @@ -235,7 +253,6 @@ static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    /*
    * Swap input with output
    */
    output_run = run1;
    char tmpfile[sizeof(output_run->input_file)];
    close(output_run->output_fd);
    output_run->output_fd = -1;
    @@ -264,9 +281,9 @@ static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    void *min_item = NULL;
    for(i = 0; i < num_runs; ++i)
    {
    if(run_index_map[i] >= runs[i]->length/run_item_size)
    if(run_index_map[i] >= run_array[i]->length/run_item_size)
    continue;
    char *cur_item = (char*)runs[i]->items + run_item_size*run_index_map[i];
    char *cur_item = (char*)run_array[i]->items + run_item_size*run_index_map[i];
    if(!min_item || run_cmp(cur_item, min_item) < 0)
    {
    min_item = cur_item;
    @@ -288,22 +305,22 @@ static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    /*
    * Check for the limits. and re-read if required for the relevant run from the input file
    */
    if(run_index_map[min_run] >= runs[min_run]->length/run_item_size)
    if(run_index_map[min_run] >= run_array[min_run]->length/run_item_size)
    {
    if(runs[min_run]->input_fd < 0)
    if(run_array[min_run]->input_fd < 0)
    {
    --cur_runs;
    }
    else
    {
    int length = read(runs[min_run]->input_fd, runs[min_run]->items, buf_size);
    int length = read(run_array[min_run]->input_fd, run_array[min_run]->items, buf_size);
    if(length <= 0)
    {
    --cur_runs;
    }
    else
    {
    runs[min_run]->length = length;
    run_array[min_run]->length = length;
    run_index_map[min_run] = 0;
    }
    }
    @@ -325,6 +342,7 @@ static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    output_run->input_fd = -1;
    }
    free(output_buffer);
    if(total_runs) *total_runs += num_runs;
    return output_run;
    }

    @@ -344,39 +362,6 @@ static void dump_result(struct sort_run *run)
    static __inline__ void dump_result(struct sort_run *run) { }
    #endif

    /*
    * TODO: Merge results in 1 shot into multiple files instead of merging 2 at a time.
    */
    static int sort_merge(struct sort_run *runs, int num_runs, struct sort_run **output_run)
    {
    int i;
    int start_index = 0;
    if(!num_runs) return -1;
    /*
    * First verify if the individual runs are sorted.
    */
    #ifdef DEBUG
    for(i = 0; i < num_runs; ++i)
    {
    verify_sort(runs+i);
    printf("Verification successful for run [%d]\n", i);
    }
    #endif
    if(num_runs >= 2)
    {
    if(!*output_run)
    {
    *output_run = merge_run(runs, runs+1);
    start_index = 2;
    }
    }
    for(i = start_index; i < num_runs; ++i)
    {
    *output_run = merge_run(*output_run, runs+i);
    }
    return 0;
    }

    static int check_libc_qsort(int fd, size_t size)
    {
    struct sort_run output_run = {0};
    @@ -472,13 +457,14 @@ int main(int argc, char **argv)
    size -= length;
    if(length != runs[i].length)
    {
    if(runs[i].items) free(runs[i].items);
    runs[i].length = length;
    runs[i].items = calloc(1, length);
    }
    assert(runs[i].items != NULL);
    runs[i].input_fd = runs[i].output_fd = -1;
    runs[i].size = sizeof(int);
    runs[i].buf_size = buf_size;
    assert(runs[i].items != NULL);
    runs[i].cmp = int_cmp;
    if(read(fd, runs[i].items, length) != length)
    {
    @@ -503,8 +489,7 @@ int main(int argc, char **argv)
    {
    parallel_sort(runs);
    }

    sort_merge(runs, i, &output_run);
    output_run = merge_runs(runs, i, &output_run, NULL);
    }
    __time(&end);
    printf("Time taken to sort [%ld] items = [%lld] usecs\n", statbuf.st_size/sizeof(int), end-start);
  3. @invalid-email-address Anonymous created this gist May 2, 2011.
    534 changes: 534 additions & 0 deletions prsort.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,534 @@
    /*
    * A parallel sort currently using only 1 intermediate input and output file for merging sort results from
    * multiple threads in each pass.
    * The memory for the sort would be the input buffer size in KB multiplied by number of cpus. + a running buffer_size in KB
    *
    * To compile:
    * gcc -o prsort prsort.c -Wall -g -std=c99 -pedantic -lpthread -lrt
    *
    * To run or test, create a 16 MB input file:
    * dd if=/dev/urandom of=random.dat bs=1M count=16
    *
    * ./prsort random.dat 2048 to run with 2 MB buffer size for each thread
    */
    #define _GNU_SOURCE
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/stat.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <assert.h>
    #include <pthread.h>
    #include <time.h>
    #include <sched.h>

    struct sort_run
    {
    void *items;
    int length;
    int size; /* size of each */
    int buf_size;
    int (*cmp)(const void *, const void *);
    int input_fd;
    int output_fd;
    char input_file[40];
    char output_file[40];
    };

    static __inline__ void __time(unsigned long long *start)
    {
    struct timespec t;
    clock_gettime(CLOCK_MONOTONIC, &t);
    *start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_nsec/1000;
    }

    static __inline__ void __swap(char *a, char *b, int s)
    {
    char temp;
    int i;
    if(a == b) return;
    for(i = 0; i < s; ++i)
    {
    temp = a[i];
    a[i] = b[i];
    b[i] = temp;
    }
    }

    static int int_cmp(const void *a, const void *b)
    {
    return *(unsigned int*)a - *(unsigned int*)b;
    }

    static int __partition(void *base, int size, int left, int right, int pivot,
    int (*cmp)(const void *, const void *))
    {
    int next_pivot = left;
    int i;
    char *pivot_ele = (char *)base + pivot*size;
    char *right_ele = (char *)base + right*size;
    /*
    * swap the existing pivot to the end.
    */
    __swap(pivot_ele, right_ele, size);
    for(i = left; i < right; ++i)
    {
    char *cur_ele = (char*)base + i*size;
    if(cmp(cur_ele, right_ele) <= 0 )
    {
    pivot_ele = (char*)base + size*next_pivot;
    __swap(pivot_ele, cur_ele, size);
    ++next_pivot;
    }
    }
    pivot_ele = (char*)base + next_pivot*size;
    /*
    * swap back the pivot to the right next_pivot index.
    */
    __swap(pivot_ele, right_ele, size);
    return next_pivot;
    }

    static int __bubble_sort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *))
    {
    int i;
    int j;
    for(i = left; i <= right; ++i)
    {
    for(j = left; j <= right-1-(i-left); ++j)
    {
    char *ele1 = (char *)base + size*j;
    char *ele2 = (char *)base + size*(j+1);
    if(cmp(ele1, ele2) > 0)
    {
    __swap(ele1, ele2, size);
    }
    }
    }
    return 0;
    }

    static int do_qsort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *))
    {
    if(right > left)
    {
    if(right - left < 5)
    return __bubble_sort(base, size, left, right, cmp);
    int pivot = (left+right) >> 1;
    int new_pivot = __partition(base, size, left, right, pivot, cmp);
    if(new_pivot - left - 1 < right - new_pivot)
    {
    do_qsort(base, size, left, new_pivot-1, cmp);
    do_qsort(base, size, new_pivot, right, cmp);
    }
    else
    {
    do_qsort(base, size, new_pivot, right, cmp);
    do_qsort(base, size, left, new_pivot-1,cmp);
    }
    }
    return 0;
    }

    static int __qsort(void *base, int nelem, int size, int (*cmp)(const void *, const void *))
    {
    int left = 0;
    int right = nelem - 1;
    return do_qsort(base, size, left, right, cmp);
    }

    static struct sort_run *g_runs;

    static void *parallel_sort(void *arg)
    {
    struct sort_run *run = arg;
    int num_input = run->length/run->size;
    int err = __qsort(run->items, num_input, run->size, run->cmp);
    printf("Sorting done for run [%ld] with [%d] items [%s]\n",
    run - g_runs, run->length/run->size, err ? "Unsuccessfully" : "Successfully");
    return (void *)(unsigned long)err;
    }

    static int verify_sort(struct sort_run *run)
    {
    int err = 0;
    int i;
    void *items = run->items;
    char *prev_ele = (char*)items;
    int num_items = run->length/run->size;
    for(i = 1; i < num_items; ++i)
    {
    char *cur_ele = (char*)items + run->size*i;
    if(run->cmp(cur_ele, prev_ele) < 0)
    {
    err = 1;
    fprintf(stderr, "Sort error for item [%p] at index [%d]\n", (void*)cur_ele, i);
    assert(0);
    }
    prev_ele = cur_ele;
    }
    return err;
    }

    static int verify_sort_file(struct sort_run *run)
    {
    char *buf ;
    int bytes;
    struct sort_run output_run = {.cmp = run->cmp, .size = run->size, .buf_size = run->buf_size };
    if(!run->output_file[0])
    return -1;
    if(run->output_fd < 0)
    {
    run->output_fd = open(run->output_file, O_RDONLY);
    assert(run->output_fd >= 0);
    }
    else lseek(run->output_fd, 0, SEEK_SET);
    buf = calloc(1, run->buf_size);
    assert(buf != NULL);
    output_run.items = buf;
    int cur_run = 0;
    while( cur_run++, (bytes = read(run->output_fd, buf, run->buf_size)) > 0 )
    {
    output_run.length = bytes;
    printf("Verifying sort run [%d] for file [%s]...\n", cur_run, run->output_file);
    verify_sort(&output_run);
    }
    free(buf);
    return 0;
    }

    static struct sort_run *merge_run(struct sort_run *run1, struct sort_run *run2)
    {
    struct sort_run *output_run;
    struct sort_run *runs[2] = { run1, run2 };
    int run_item_size,buf_size;
    int (*run_cmp)(const void *,const void *) = NULL;
    int cur_runs = 2, num_runs = 2;
    int run_index_map[2] = {0};
    int cur_output_index = 0;
    static char input_file[] = "foo";
    static char output_file[] = "bar";
    char *output_buffer = NULL;
    int i;
    if(!run1) return run2;
    assert(run1->size == run2->size);
    assert(run1->cmp == run2->cmp);
    run_cmp = run1->cmp;
    run_item_size = run1->size;
    buf_size = run1->buf_size;
    if(run1->output_fd < 0)
    {
    output_run = calloc(1, sizeof(*output_run));
    assert(output_run != NULL);
    output_run->size = run_item_size;
    output_run->cmp = run_cmp;
    output_run->buf_size = buf_size;
    output_run->input_fd = -1;
    output_run->output_file[0] = 0;
    strncat(output_run->output_file, output_file, sizeof(output_run->output_file)-1);
    output_run->input_file[0] = 0;
    strncat(output_run->input_file, input_file, sizeof(output_run->input_file)-1);
    }
    else
    {
    /*
    * Swap input with output
    */
    output_run = run1;
    char tmpfile[sizeof(output_run->input_file)];
    close(output_run->output_fd);
    output_run->output_fd = -1;
    tmpfile[0] = 0;
    strncat(tmpfile, output_run->input_file, sizeof(tmpfile)-1);
    output_run->input_file[0] = 0;
    strncat(output_run->input_file, output_run->output_file, sizeof(output_run->input_file)-1);
    output_run->output_file[0] = 0;
    strncat(output_run->output_file, tmpfile, sizeof(output_run->output_file)-1);
    output_run->input_fd = open(output_run->input_file, O_RDONLY);
    assert(output_run->input_fd >= 0);
    output_run->items = calloc(1, buf_size);
    assert(output_run->items != NULL);
    output_run->length = read(output_run->input_fd, output_run->items, buf_size);
    if(output_run->length < 0)
    output_run->length = 0;
    }
    output_run->output_fd = open(output_run->output_file, O_RDWR | O_CREAT | O_TRUNC, 0777);
    assert(output_run->output_fd >= 0);
    output_buffer = calloc(1, buf_size);
    assert(output_buffer != NULL);

    while(cur_runs > 0)
    {
    int min_run = 0;
    void *min_item = NULL;
    for(i = 0; i < num_runs; ++i)
    {
    if(run_index_map[i] >= runs[i]->length/run_item_size)
    continue;
    char *cur_item = (char*)runs[i]->items + run_item_size*run_index_map[i];
    if(!min_item || run_cmp(cur_item, min_item) < 0)
    {
    min_item = cur_item;
    min_run = i;
    }
    }
    run_index_map[min_run]++;
    char *tgt_item = output_buffer + cur_output_index*run_item_size;
    memcpy(tgt_item, min_item, run_item_size);
    ++cur_output_index;
    /*
    * Flush output buffer if at the limit.
    */
    if(cur_output_index >= buf_size/run_item_size)
    {
    write(output_run->output_fd, output_buffer, buf_size);
    cur_output_index = 0;
    }
    /*
    * Check for the limits. and re-read if required for the relevant run from the input file
    */
    if(run_index_map[min_run] >= runs[min_run]->length/run_item_size)
    {
    if(runs[min_run]->input_fd < 0)
    {
    --cur_runs;
    }
    else
    {
    int length = read(runs[min_run]->input_fd, runs[min_run]->items, buf_size);
    if(length <= 0)
    {
    --cur_runs;
    }
    else
    {
    runs[min_run]->length = length;
    run_index_map[min_run] = 0;
    }
    }
    }
    }
    /*
    * Final flush.
    */
    if(cur_output_index > 0)
    {
    write(output_run->output_fd, output_buffer, cur_output_index*run_item_size);
    fsync(output_run->output_fd);
    }
    if(output_run->input_fd >= 0)
    {
    free(output_run->items);
    output_run->items = NULL;
    close(output_run->input_fd);
    output_run->input_fd = -1;
    }
    free(output_buffer);
    return output_run;
    }

    #ifdef DEBUG
    static void dump_result(struct sort_run *run)
    {
    int i;
    int nentries = run->length/run->size;
    char *result = run->items;
    for(i = 0; i < nentries; ++i)
    {
    unsigned int *ele = (unsigned int*) (result + i*sizeof(int));
    printf("[%d] = [%u]\n", i, *ele);
    }
    }
    #else
    static __inline__ void dump_result(struct sort_run *run) { }
    #endif

    /*
    * TODO: Merge results in 1 shot into multiple files instead of merging 2 at a time.
    */
    static int sort_merge(struct sort_run *runs, int num_runs, struct sort_run **output_run)
    {
    int i;
    int start_index = 0;
    if(!num_runs) return -1;
    /*
    * First verify if the individual runs are sorted.
    */
    #ifdef DEBUG
    for(i = 0; i < num_runs; ++i)
    {
    verify_sort(runs+i);
    printf("Verification successful for run [%d]\n", i);
    }
    #endif
    if(num_runs >= 2)
    {
    if(!*output_run)
    {
    *output_run = merge_run(runs, runs+1);
    start_index = 2;
    }
    }
    for(i = start_index; i < num_runs; ++i)
    {
    *output_run = merge_run(*output_run, runs+i);
    }
    return 0;
    }

    static int check_libc_qsort(int fd, size_t size)
    {
    struct sort_run output_run = {0};
    char *buf = calloc(1, size);
    loff_t offset = 0;
    assert(buf);
    int bytes = 0;
    lseek(fd, 0, SEEK_SET);
    while( (bytes = read(fd, buf + offset, size - offset) ) > 0 )
    offset += bytes;

    output_run.length = size;
    output_run.items = buf;
    output_run.size = sizeof(int);
    output_run.cmp = int_cmp;
    unsigned long long start = 0, end = 0;
    __time(&start);
    qsort(buf, size/sizeof(int), sizeof(int), int_cmp);
    __time(&end);
    verify_sort(&output_run);
    printf("LIBC qsort time for [%ld] items = [%lld] usecs\n", size/sizeof(int), end - start);
    free(buf);
    return 0;
    }

    static int get_cpu_count(void)
    {
    cpu_set_t set;
    sched_getaffinity(0, sizeof(set), &set);
    return CPU_COUNT(&set);
    }

    int main(int argc, char **argv)
    {
    int num_threads;
    pthread_t *tids;
    char filename[0xff+1];
    int buf_size;
    char *endp = NULL;
    int fd;
    struct stat statbuf;
    size_t size;
    int i;

    if(argc != 3)
    {
    fprintf(stderr, "Insufficient args...\n");
    fprintf(stderr, "%s filename buf_size_in_kb\n", argv[0]);
    exit(127);
    }
    filename[0] = 0;
    strncat(filename, argv[1], sizeof(filename)-1);
    buf_size = strtol(argv[2], &endp, 10);
    if(*endp)
    {
    fprintf(stderr, "Invalid format for buf size [%s]\n", argv[2]);
    exit(127);
    }
    buf_size <<= 10;/*in kb*/
    fd = open(filename, O_RDONLY);
    if(fd < 0)
    {
    perror("open:");
    exit(127);
    }
    if(fstat(fd, &statbuf))
    {
    perror("fstat:");
    exit(127);
    }
    size = statbuf.st_size;
    if(buf_size > size)
    buf_size = size;
    /*
    * Allocate threads based on the current cpu count for real parallelization.
    */
    num_threads = get_cpu_count();
    assert(num_threads >= 1);
    struct sort_run *runs = calloc(num_threads, sizeof(*runs));
    assert(runs != NULL);
    g_runs = runs;
    tids = calloc(num_threads, sizeof(*tids));
    assert(tids != NULL);
    unsigned long long start = 0, end = 0;
    struct sort_run *output_run = NULL;
    __time(&start);
    while(size > 0)
    {
    for(i = 0; i < num_threads && size > 0; ++i)
    {
    int length = buf_size;
    if(size < buf_size) length = size;
    size -= length;
    if(length != runs[i].length)
    {
    runs[i].length = length;
    runs[i].items = calloc(1, length);
    }
    runs[i].input_fd = runs[i].output_fd = -1;
    runs[i].size = sizeof(int);
    runs[i].buf_size = buf_size;
    assert(runs[i].items != NULL);
    runs[i].cmp = int_cmp;
    if(read(fd, runs[i].items, length) != length)
    {
    perror("read:");
    goto out;
    }
    }
    /*
    * Start all the threads at once. with the sort load.
    */
    if(i > 1)
    {
    for(int j = 0; j < i; ++j)
    {
    int err = pthread_create(tids + j, NULL, parallel_sort, runs+j);
    assert(err == 0);
    }
    for(int j = 0; j < i; ++j)
    pthread_join(tids[j], NULL);
    }
    else
    {
    parallel_sort(runs);
    }

    sort_merge(runs, i, &output_run);
    }
    __time(&end);
    printf("Time taken to sort [%ld] items = [%lld] usecs\n", statbuf.st_size/sizeof(int), end-start);

    out:
    for(i = 0; i < num_threads; ++i)
    {
    if(runs[i].items) free(runs[i].items);
    }
    free(runs);
    if(output_run)
    {
    if(output_run->items)
    {
    free(output_run->items);
    output_run->items = NULL;
    }
    verify_sort_file(output_run);
    if(output_run->output_fd >= 0)
    close(output_run->output_fd);
    free(output_run);
    }
    if(tids) free(tids);
    //check_libc_qsort(fd, statbuf.st_size);
    close(fd);
    return 0;
    }