Created
November 20, 2012 12:24
-
-
Save carun/4117631 to your computer and use it in GitHub Desktop.
Multithreaded application to process files in a directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <errno.h> | |
#include <unistd.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <dirent.h> | |
#include <pthread.h> | |
#define NAME_MAX (256) | |
/* #define MAX_THREADS (99) */ | |
#define SLEEP_DUR (1) | |
struct thread_data { | |
char file_name[NAME_MAX+1]; | |
}; | |
static int num_threads; | |
static pthread_cond_t cond; | |
static pthread_mutex_t count_mutex; | |
int select_filter( struct dirent *entry ) | |
{ | |
/* Filesystem differences with EXT4 and JFS2 - Unable to use d_type member | |
* to compare DT_REG to check for the filetype */ | |
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) | |
return 0; | |
else | |
return 1; | |
} | |
void *process_file( void *arg ) | |
{ | |
struct thread_data *data_t = (struct thread_data *)arg; | |
unsigned int i, j; | |
pthread_t p = pthread_self(); | |
/* Do some processing with data_t */ | |
fprintf( stdout, "%ld: Processing file %s\n", (unsigned long int)p, data_t->file_name ); | |
unlink( data_t->file_name ); | |
/* When done processing, free the resources */ | |
free( data_t ); | |
for (i=0; i< 9999; i++) | |
for (j=0; j< 9999; j++); | |
/* fprintf( stdout, "%p: Locking Mutex\n", (void *)p); */ | |
pthread_mutex_lock( &count_mutex ); | |
/* Critical section */ | |
num_threads --; | |
/* fprintf( stdout, "%p: Signaling\n", (void *)p); */ | |
pthread_cond_signal( &cond ); | |
/* fprintf( stdout, "%p: Unlocking Mutex\n", (void *)p); */ | |
fprintf( stdout, "%d: Exiting. num_threads: %d\n", p, num_threads); | |
pthread_mutex_unlock( &count_mutex ); | |
pthread_exit( NULL ); | |
return NULL; | |
} | |
int main(int argc, const char *argv[]) | |
{ | |
struct dirent **file_list = NULL; | |
short int no_of_files = 0; | |
char *dir_name = "/u/home/raddanki/Arun/Threads/data"; | |
short int idx = 0; | |
struct thread_data *data_t = NULL; | |
int MAX_THREADS = 0; | |
pthread_t tid = -1; | |
pthread_attr_t attr; | |
/* Get the arguments */ | |
if (argc < 2) { | |
printf( "Usage: %s <no-of-threads>\n", argv[0] ); | |
exit(0); | |
} | |
MAX_THREADS = atoi( argv[1] ); | |
if (MAX_THREADS < 1 && MAX_THREADS > 99) { | |
printf( "Max threads should be between 1 and 99. Provided: %d\n", MAX_THREADS ); | |
exit(0); | |
} | |
/* Initialize thread attributes */ | |
pthread_cond_init( &cond, NULL ); | |
pthread_mutex_init( &count_mutex, NULL ); | |
pthread_attr_init( &attr ); | |
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); | |
while (1) { | |
while (no_of_files == 0) { | |
/* Open the dir for reading */ | |
pthread_mutex_lock( &count_mutex ); | |
if (num_threads == 0) { | |
if (file_list) | |
free( file_list ); | |
if ((no_of_files = scandir( dir_name, &file_list, select_filter, alphasort )) == -1) { | |
fprintf( stdout, "Failed to open dir: %s\n", dir_name ); | |
/* need to wait for threads to complete and then exit */ | |
exit (1); | |
} | |
} | |
pthread_mutex_unlock( &count_mutex ); | |
idx = no_of_files - 1; | |
if (!no_of_files) { | |
#ifdef DEBUG | |
if (num_threads == 0) { | |
free( file_list ); | |
pthread_attr_destroy( &attr ); | |
pthread_mutex_destroy( &count_mutex ); | |
pthread_cond_destroy( &cond ); | |
pthread_exit(0); | |
} | |
#endif | |
fprintf( stdout, "Going to sleep now\n" ); | |
sleep(SLEEP_DUR); | |
} | |
} | |
/* fprintf( stdout, "MAIN: Locking Mutex\n" ); */ | |
/* while (1) { */ | |
pthread_mutex_lock( &count_mutex ); | |
if (num_threads == MAX_THREADS) { | |
/* Let the threads finish */ | |
/* fprintf( stdout, "MAIN: Waiting on condition\n" ); */ | |
pthread_cond_wait( &cond, &count_mutex ); | |
fprintf( stdout, "MAIN: Waking up from condition\n" ); | |
/* fprintf( stdout, "MAIN: Unlocking Mutex\n" ); */ | |
} | |
pthread_mutex_unlock( &count_mutex ); | |
/* else */ | |
/* break; */ | |
/* } */ | |
/* Be sure about the boundaries in File[] */ | |
if (num_threads > MAX_THREADS) { | |
fprintf( stdout, "Prevented severe memory error %d\n", num_threads ); | |
exit(2); | |
} | |
if (((data_t = calloc( 1, sizeof(struct thread_data) )) == NULL)) { | |
fprintf( stdout, "Failed to allocate memory\n" ); | |
exit(1); | |
} | |
snprintf( data_t->file_name, sizeof(data_t->file_name), "%s/%s", dir_name, file_list[idx]->d_name ); | |
free(file_list[idx--]); | |
no_of_files--; | |
fprintf( stdout, "MAIN: No of files: %d\n", no_of_files ); | |
/* Manage the thread stuff */ | |
if (pthread_create( &tid, &attr, process_file, (void *)data_t ) != 0) { | |
fprintf( stdout, "Failed to allocate memory for thread\n" ); | |
exit(1); | |
} | |
fprintf( stdout, "Created thread: %d\nMAIN: Locking Mutex: %d\n", tid, num_threads ); | |
pthread_mutex_lock( &count_mutex ); | |
num_threads++; | |
/* fprintf( stdout, "MAIN: Unlocking Mutex 2 - %d\n", num_threads ); */ | |
pthread_mutex_unlock( &count_mutex ); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment