Skip to content

Instantly share code, notes, and snippets.

@davisjam
Created January 5, 2018 18:17

Revisions

  1. davisjam created this gist Jan 5, 2018.
    842 changes: 842 additions & 0 deletions distributed-map.pl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,842 @@
    #!/usr/bin/env perl
    # Author: Jamie Davis <[email protected]>
    # Description:
    # - Perform a bunch of operations, using a cluster of worker nodes.
    # - To cleanly exit early, check the first few lines of stderr for a file to touch.

    # Dependencies.
    use strict;
    use warnings;

    use threads;
    use threads::shared;

    use JSON::PP;
    use Getopt::Long;
    use Time::HiRes qw( usleep );
    use Carp;

    # Globals.
    my %globals;

    my $LOG_LOCK : shared; # Logging.

    my $WORKERS_DONE : shared; # Worker management.
    $WORKERS_DONE = 0;

    my $TASK_LOCK : shared; # Getting and delivering tasks.
    my $tasksLoaded = 0;
    my $TASKS : shared; # array ref

    my $RESULT_LOCK : shared; # Emitting results.
    my $RESULT_FH;
    my $resultFHOpened : shared;
    $resultFHOpened = 0;

    my $exitEarlyFile = "/tmp/distributed-work-exit-early-pid$$"; # External signaling.
    unlink $exitEarlyFile;
    my $EXIT_EARLY : shared;
    $EXIT_EARLY = 0;

    my $NO_TASKS_LEFT = -1;

    # Process args.
    my $invocation = "$0 " . join(" ", @ARGV);
    my %args;
    GetOptions(\%args,
    "cluster=s",
    "workScript=s",
    "taskFile=s",
    "resultFile=s",
    "workers=s@",
    "notWorkers=s@",
    "unusedCores=i",
    "copy=s",
    "PATHprefix=s",
    "verbose",
    "help",
    ) or die "Error parsing args\n";
    %globals = &processArgs(%args);
    &log("Invocation: $invocation");

    my @workerNames = map { $_->{host} } @{$globals{cluster}};
    &log(scalar(@{$globals{cluster}}) . " workers: <@workerNames>");

    # Load tasks.
    &log("Loading tasks from $globals{taskFile}");
    $TASKS = shared_clone([&loadTasks("taskFile"=>$globals{taskFile})]);

    &log("Opening resultFile $globals{resultFile}");
    $RESULT_FH = &openResultFile($globals{resultFile});

    # Start and await workers.
    &log("Starting workers");
    &log("To cleanly exit early, run the following command:\n touch $exitEarlyFile");
    &runWorkers("cluster"=>$globals{cluster});
    &log("See results in $globals{resultFile}");
    &noMoreResults();

    # Write out any remaining tasks.
    if ($EXIT_EARLY) {
    &log("Exited early, writing out any remaining tasks");
    &writeOutRemainingTasks();
    }

    exit(0);

    # Launch and await workers on all workers
    #
    # input: %args keys: cluster
    sub runWorkers {
    my (%args) = @_;
    &assertUsage("runWorkers: Error, usage: hash with keys: cluster", $args{cluster});

    # Propagate copy
    if ($globals{copy_src} and $globals{copy_dest}) {
    &log("runWorkers: Propagating copy");
    &propagateCopyDir("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$args{cluster});
    }

    # Start workers
    &log("runWorkers: Starting workers");
    my @my_threads;
    my $thread_exitEarly;
    for my $worker (@{$args{cluster}}) {
    my $unusedCores = &min($globals{unusedCores}, $worker->{logicalCores});
    if ($worker->{unusedCores}) { # Override global recommendation?
    $unusedCores = $worker->{unusedCores};
    }
    my $usedCores = $worker->{logicalCores} - $unusedCores;

    &log("runWorkers: Starting $usedCores processes on worker $worker->{host}, leaving $unusedCores unused cores");
    for my $core (1 .. $usedCores) {
    my %workerWithID = %{$worker};
    $workerWithID{id} = $core;
    &log("runWorkers: $workerWithID{host}:$workerWithID{id}");
    push @my_threads, threads->create(\&thread_worker, \%workerWithID);
    }
    }

    $thread_exitEarly = threads->create(\&thread_exitEarly, ($exitEarlyFile));

    &log("runWorkers: Waiting on my " . scalar(@my_threads) . " threads");
    for my $thr (@my_threads) {
    &log("runWorkers: thread finished!");
    $thr->join();
    }

    # If not already done, signal $thread_exitEarly.
    { lock($WORKERS_DONE);
    $WORKERS_DONE = 1;
    }

    &log("runWorkers: Waiting on thread_exitEarly");
    $thread_exitEarly->join();

    &log("runWorkers: Done");
    return;
    }

    sub getRemainingTasks {
    # Extract all remaining tasks.
    my @remainingTasks;
    while (1) {
    my $t = &getNextTask();
    last if ($t eq $NO_TASKS_LEFT);
    push @remainingTasks, $t;
    }

    my @strings = map { &task_toString($_) } @remainingTasks;
    &log("getRemainingTasks: Got " . scalar(@remainingTasks) . " remaining tasks: <@strings>");
    return @remainingTasks;
    }

    # input: ()
    # output: ($anyRemainingTasks)
    sub writeOutRemainingTasks {
    # A file to write to.
    my $remainingWorkFile = "/tmp/distributed-work-remainingWork-pid$$\.txt";
    unlink $remainingWorkFile;

    my @remainingTasks = &getRemainingTasks();
    if (@remainingTasks) {
    # Convert back to JSON and write out.
    my @lines = map { encode_json($_->{task}) } @remainingTasks;
    my $contents = join("\n", @lines);
    &writeToFile("file"=>$remainingWorkFile, "contents"=>$contents);
    &log(scalar(@remainingTasks) . " tasks remaining, see $remainingWorkFile");
    }

    return (0 < scalar(@remainingTasks));
    }

    # Thread.
    # Get and do tasks until none remain, then return.
    #
    # input: ($worker) worker from &getClusterInfo, with extra key 'id'
    # output: ()
    sub thread_worker {
    my ($worker) = @_;

    my $tid = threads->tid();

    my $workerStr = "$worker->{host}:$worker->{id}";
    my $logPref = $workerStr;

    for (my $taskNum = 0; ; $taskNum++) {
    # Should we EXIT_EARLY?
    {
    lock($EXIT_EARLY);
    if ($EXIT_EARLY) {
    &log("$logPref: exiting early");
    return;
    }
    }

    # Get and complete a task.
    my $task = &getNextTask();
    if ($task eq $NO_TASKS_LEFT) {
    &log("$logPref: No tasks left");
    last;
    }
    &log("$logPref: task " . scalar(&task_toString($task)));

    my $out = &work("worker"=>$worker, "task"=>$task);
    &log("$logPref: completed task $task->{id}. Output: $out");
    my $result = { "task"=>$task, "worker"=>$workerStr, "output"=>$out };
    &emitResult($result);
    }

    return;
    }

    # input: %args: keys: src dest worker
    # output: $dest
    #
    # Transfer the specified src
    sub transferFile {
    my %args = @_;
    &assertUsage("transferFile: usage: src dest worker", $args{src}, $args{dest}, $args{worker});

    my ($out, $rc) = &cmd("scp -P$args{worker}->{port} $args{src} $args{worker}->{user}\@$args{worker}->{host}:$args{dest}");
    return $args{dest};
    }

    # input: %args: keys: file contents
    # output: $file
    sub writeToFile {
    my %args = @_;
    &assertUsage("writeToFile: usage: file contents", $args{file}, $args{contents});

    open(my $fh, '>', $args{file});
    print $fh $args{contents};
    close $fh;

    return $args{file};
    }

    # input: (%args) keys: worker task
    # worker entry from &getClusterInfo
    # task created by &createTask
    # output: ($out)
    sub work {
    my %args = @_;
    &assertUsage("work: usage: worker task", $args{worker}, $args{task});

    my $tid = threads->tid();

    # Create task file.
    my $localTaskFile = "/tmp/parallel-process-repo-$$\_$tid-LOCAL.json";
    my $remoteTaskFile = "/tmp/parallel-process-repo-$$\_$tid-REMOTE.json";
    &writeToFile("file"=>$localTaskFile, "contents"=>encode_json($args{task}->{task}));
    &transferFile("src"=>$localTaskFile, "dest"=>$remoteTaskFile, "worker"=>$args{worker});
    unlink $localTaskFile;

    # Process task remotely and log output.
    my $PATHprefix = $globals{PATHprefix} ? "PATH=$globals{PATHprefix}:\$PATH" : "";
    my $remoteCmd = "$PATHprefix $globals{workScript} $remoteTaskFile 2>/dev/null; rm $remoteTaskFile";
    my $out = &remoteCommand("accessCreds"=>$args{worker}, "command"=>$remoteCmd);
    unlink $remoteTaskFile;

    return $out;
    }

    # Thread.
    # Forever: Check whether we should exit early.
    # If so, set $EXIT_EARLY and then return.
    # Otherwise, if workers have finished ($WORKERS_DONE), return.
    #
    # input: ($exitEarlyFile)
    # output: ()
    sub thread_exitEarly {
    my ($exitEarlyFile) = @_;

    while (1) {

    # Exit early?
    if (-f $exitEarlyFile) {
    { lock($EXIT_EARLY);
    $EXIT_EARLY = 1;
    }
    last;
    }

    # Workers done?
    my $done;
    { lock($WORKERS_DONE);
    $done = $WORKERS_DONE;
    }
    if ($done) {
    last;
    }

    usleep(100*1000); # 100 ms
    }
    }


    # input: ($clusterFile)
    # output: @cluster: list of node objects with keys: host user port logicalCores
    sub getClusterInfo {
    my ($clusterFile) = @_;
    &assertUsage("getClusterInfo: Error, usage: (clusterFile)", $clusterFile);

    if (not -f $clusterFile or $clusterFile !~ m/\.json$/i) {
    die "getClusterInfo: Error, invalid clusterFile <$clusterFile>\n";
    }

    my ($out, $rc) = &cmd("cat $clusterFile 2>/dev/null");
    if ($rc) {
    die "getClusterInfo: Error, could not read clusterFile <$clusterFile>: $!\n";
    }

    my $cluster = eval {
    return decode_json($out);
    };
    if ($@) {
    die "getClusterInfo: Error parsing clusterFile <$clusterFile>: $@\n";
    }

    # Confirm nodes are valid.
    my @cluster = @$cluster;
    my $i = 0;
    for my $node (@cluster) {
    if (not &_isClusterNodeValid($node)) {
    die "getClusterInfo: Error, node $i is invalid (0-indexed)\n";
    }
    $i++;
    }

    # Augment with a "logicalCores" field if none provided
    for my $node (@cluster) {
    if (not defined $node->{logicalCores}) {
    my $out = &remoteCommand("accessCreds"=>$node, "command"=>"nproc");
    if ($out =~ m/^(\d+)$/) {
    $node->{logicalCores} = int($out);
    }
    else {
    die "getClusterInfo: Error, could not get logical cores for node $node->{host}:\n$out\n";
    }
    }
    &log("$node->{host} has $node->{logicalCores} logical cores");
    }

    return @$cluster;
    }

    # input: ($clusterNode)
    # output: ($isValid)
    sub _isClusterNodeValid {
    my ($node) = @_;
    my @keys = ("host", "port", "user");
    if (not $node) {
    return 0;
    }

    for my $key (@keys) {
    if (not $node->{$key}) {
    return 0;
    }
    }

    return 1;
    }

    ###
    # Usage message, arg parsing.
    ###

    sub getTerseUsage {
    my $terseUsage = "Usage: $0 --cluster C.json --workScript W --taskFile F
    [--resultFile R] [--workers w1,...] [--notWorkers w1,...]
    [--unusedCores N]
    [--copy src:dest] [--PATHprefix dir1:...]
    [--verbose] [--help]
    ";
    return $terseUsage;
    }


    sub shortUsage {
    print &getTerseUsage();
    exit 0;
    }

    sub longUsage {
    my $terseUsage = &getTerseUsage();

    print "Description: Distribute tasks across workers
    $terseUsage
    --cluster C.json JSON-formatted cluster of workers
    Should be an array of \"node\" objects with minimal keys: host port user [logicalCores] [unusedCores]
    host, port, user: suitable for passwordless ssh
    [logicalCores]: skip query of node for # logical cores
    [unusedCores]: override global --unusedCores
    --workScript W Script to execute against each task
    **Must exist on every worker**
    Argument is a filename, its stdout is saved in resultFile
    --taskFile F One task per line, JSON-encoded.
    [--resultFile R] One result per line, NOT guaranteed in the same order
    JSON-encoded.
    [--workers w1,... | --notWorkers w1,... ] Workers to use | workers not to use
    [--unusedCores N] Cores to leave available on a worker
    [--copy src:dest] Copy src to dest on every worker before running workScript
    Must be that src != dest. Can be a file or a dir.
    [--PATHprefix dir1:...] Prefix PATH with this string when executing workScript
    [--verbose]
    [--help]
    ";
    }

    # Process args after GetOptions, ensure validity, etc.
    #
    # input: (%args) from GetOptions
    # output: (%globals) with keys:
    # cluster listref of hashrefs representing nodes to use in the worker cluster
    # workScript script to execute
    # taskFile one task per line
    # resultFile one result per line
    # unusedCores cores to leave idle on each worker
    # [copy_src dir on manager]
    # [copy_dest dir on worker]
    # [PATHprefix prefix for PATH when invoking workScript]
    # verbose extra loud
    sub processArgs {
    my %args = @_;
    my $invalidArgs = 0;

    # Bail out on no args or help.
    if (not scalar(keys %args)) {
    &shortUsage();
    exit 0;
    }

    if ($args{help}) {
    &longUsage();
    exit 0;
    }

    # cluster
    my @cluster;
    if ($args{cluster} and -f $args{cluster} and $args{cluster} =~ m/\.js(on)?$/i) {
    &log("Cluster file $args{cluster}");
    @cluster = &getClusterInfo($args{cluster});

    # workers
    if ($args{workers}) {
    $args{workers} = [split(",", join(",", @{$args{workers}}))]; # --w x,y --w z becomes x,y,z
    }
    else {
    $args{workers} = [map { $_->{host} } @cluster]; # Default to include all
    }

    # Filter in workers
    &log("Filtering in workers <@{$args{workers}}>");
    @cluster = grep { &listContains($args{workers}, $_->{host}) } @cluster;
    # my @filteredCluster;
    # for my $maybeNode (@cluster) {
    # if (&listContains($args{workers}, $maybeNode->{host})) {
    # push @filteredCluster, $maybeNode;
    # }
    # }
    # @cluster = @filteredCluster;

    # notWorkers
    if ($args{notWorkers}) {
    $args{notWorkers} = [split(",", join(",", @{$args{notWorkers}}))]; # --w x,y --w z becomes x,y,z
    }
    else {
    $args{notWorkers} = []; # Default to exclude none
    }

    # Filter out notWorkers
    &log("Filtering out workers <@{$args{notWorkers}}>");
    @cluster = grep { not &listContains($args{notWorkers}, $_->{host}) } @cluster;
    # @filteredCluster = ();
    # for my $maybeNode (@cluster) {
    # if (not &listContains($args{notWorkers}, $maybeNode->{host})) {
    # push @filteredCluster, $maybeNode;
    # }
    # }
    # @cluster = @filteredCluster;

    my @survivingNames = map { $_->{host} } @cluster;
    if (@survivingNames) {
    &log("Using " . scalar(@cluster) . " workers <@survivingNames>");
    }
    else {
    &log("Error, no workers");
    $invalidArgs = 1;
    }

    }
    else {
    &log("Error, invalid cluster file");
    $invalidArgs = 1;
    }

    # workScript
    if ($args{workScript}) {
    &log("Using workScript $args{workScript}");
    # Can't use -f.
    # workScript might not exist *anywhere* yet before we honor --copy.
    # And this node might not be a worker so -f would still fail.
    }

    # taskFile
    if ($args{taskFile} and -f $args{taskFile}) {
    &log("Using taskFile $args{taskFile}");
    }
    else {
    &log("Error, invalid taskFile");
    $invalidArgs = 1;
    }

    # resultFile
    if (not $args{resultFile}) {
    $args{resultFile} = "/tmp/parallel-work-pid$$-results";
    }
    unlink $args{resultFile};
    &log("Using resultFile $args{resultFile}");

    # unusedCores
    if (not defined $args{unusedCores}) {
    $args{unusedCores} = 0;
    }

    if (0 <= $args{unusedCores}) {
    &log("Using unusedCores $args{unusedCores}");
    }
    else {
    &log("Error, invalid unusedCores $args{unusedCores}");
    $invalidArgs = 1;
    }

    # copy
    my ($copySrc, $copyDest);
    if (defined $args{copy}) {
    my @spl = split(":", $args{copy});
    if (scalar(@spl) == 2) {
    my ($src, $dest) = @spl;
    if ($src ne $dest and -e $src) {
    &log("Using copySrc $src copyDest $dest");
    $copySrc = $src;
    $copyDest = $dest;
    }
    else {
    &log("Error, invalid copy src $src dest $dest. Must have src != dest, and src must exist");
    $invalidArgs = 1;
    }
    }
    else {
    &log("Error, malformed copy. Must be src:dest");
    $invalidArgs = 1;
    }
    }

    # PATHprefix
    if (not defined $args{PATHprefix}) {
    $args{PATHprefix} = "";
    }

    # verbose
    if (not defined $args{verbose}) {
    $args{verbose} = 0;
    }

    # Error out on invalid args.
    if ($invalidArgs) {
    &shortUsage();
    exit 1;
    }

    my %globals = ("cluster" => \@cluster,
    "workScript" => $args{workScript},
    "taskFile" => $args{taskFile},
    "resultFile" => $args{resultFile},
    "unusedCores" => $args{unusedCores},
    "copy_src" => $copySrc,
    "copy_dest" => $copyDest,
    "PATHprefix" => $args{PATHprefix},
    "verbose" => $args{verbose},
    );
    return %globals;
    }

    ###
    # Task management.
    ###

    # input: (%args) keys: taskFile
    # output: (@tasks)
    # Each elt contains a task from &task_create,
    # where the 'task' is a json-decoded version of one of the lines from the taskFile.
    sub loadTasks {
    my %args = @_;
    &assertUsage("Usage: taskFile", $args{taskFile});

    lock($TASK_LOCK);
    return if ($tasksLoaded);
    $tasksLoaded = 1;

    my @taskLines = `cat $args{taskFile}`;
    chomp @taskLines;

    my $_id = 1; # 1-indexed for sanity

    my @tasks;
    for my $line (@taskLines) {
    &log("Task line: <$line>");

    my $task = decode_json($line);
    my $id = $_id;
    push @tasks, &task_create("id"=>$id, "task"=>$task);

    $_id++;
    }

    return @tasks;
    }

    # Get the next task from @TASKS.
    #
    # input: ()
    # output: ($task) or $NO_TASKS_LEFT
    # $task as created by &task_create.
    sub getNextTask {
    lock($TASK_LOCK);
    &assert(($tasksLoaded), "getNextTask: Tasks never loaded");

    if (not @{$TASKS}) {
    return $NO_TASKS_LEFT;
    }

    my $nextTask = shift @{$TASKS};
    &log("getNextTask: got <" . &task_toString($nextTask) . ">, " . scalar(@${TASKS}) . " tasks remaining");
    return $nextTask;
    }

    # input: (%args) keys: id task
    # output: ($task) a ref with keys: id task
    sub task_create {
    my (%args) = @_;
    &assertUsage("createTask: usage: id task", $args{id}, $args{task});

    return { "id" => $args{id},
    "task" => $args{task},
    };
    }

    sub task_toString {
    my ($task) = @_;

    return "$task->{id}: " . encode_json($task->{task});
    }

    ###
    # Result management.
    ###

    # input: ($resultFile)
    # output: ($FH)
    sub openResultFile {
    my ($resultFile) = @_;

    lock($RESULT_LOCK);

    if (not $resultFHOpened) {
    &log("emitResult: Opening $resultFile for results");
    open($RESULT_FH, ">", $resultFile) or confess "Error, could not open resultFile: $!\n";
    $resultFHOpened = 1;
    }

    return $RESULT_FH;
    }

    # input: ($result) result object with keys: task workerInfo output
    # task: from &getNextTask
    # result: command-line output
    # output: ()
    sub emitResult {
    my ($result) = @_;

    lock($RESULT_LOCK);

    if (not $resultFHOpened) {
    confess "Error, must call openResultFile first\n";
    }

    my $encodedResult = encode_json($result);

    &log("emitResult: Emitting: <$encodedResult>");
    print $RESULT_FH "$encodedResult\n";

    return;
    }

    sub noMoreResults {
    if ($RESULT_FH) {
    close($RESULT_FH) or &log("Error, closing result_fh failed: $!");
    }
    }

    ###
    # Handle copy
    ###

    # input: (%args) keys: src dest destAccessCreds
    # destAccessCreds: array ref of hashrefs of destAccessCreds for use with remoteCopy
    # output: ()
    sub propagateCopyDir {
    my %args = @_;

    my @helpers;

    # Parallel propagation.
    for my $cred (@{$args{destAccessCreds}}) {
    my %args = ("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$cred);
    my $thr = threads->create(sub {
    my %args = @_;
    my $rc = &remoteCopy("src"=>$args{src}, "dest"=>$args{dest}, "destAccessCreds"=>$cred);
    return $rc;
    }, %args);
    push @helpers, $thr;
    }

    # Confirm that all propagation succeeded.
    my @results;
    for my $helper (@helpers) {
    push @results, $helper->join();
    }

    if (grep { $_ ne 0 } @results) {
    confess "Error, at least one copy failure: <@results>\n";
    }

    return;
    }

    ###
    # Utility
    ###

    # input: (\@list, $e)
    # output: true if $e is in @list, else false
    sub listContains {
    my ($list, $e) = @_;
    for my $elt (@$list) {
    if ($elt eq $e) {
    return 1;
    }
    }

    return 0;
    }

    sub min {
    my (@nums) = @_;

    my $min = $nums[0];
    for my $n (@nums) {
    if ($n < $min) {
    $min = $n;
    }
    }

    return $min;
    }

    sub max {
    my (@nums) = @_;

    my $max = $nums[0];
    for my $n (@nums) {
    if ($max < $n) {
    $max = $n;
    }
    }

    return $max;
    }

    sub assert {
    my ($cond, $msg) = @_;
    if (not $cond) {
    print "ERROR: $msg\n";
    exit 1;
    }
    }

    # input: ($msg, @varsThatShouldBeDefined)
    # output: ()
    sub assertUsage {
    my ($msg, @shouldBeDefined) = @_;

    my @undefined = grep { not defined $_ } @shouldBeDefined;
    &assert((not @undefined), $msg);
    }

    # input: ($cmd)
    # output: ($out, $rc)
    sub cmd {
    my ($cmd) = @_;
    &log($cmd);
    my $out = `$cmd 2>&1`;
    return ($out, $? >> 8);
    }

    sub log {
    my ($msg) = @_;
    my $now = localtime;
    lock($LOG_LOCK);
    print STDERR "$now: $msg\n";
    }

    # input: (%args) keys: accessCreds command
    # accessCreds: hashref, keys: port user host
    # hint: a worker object can be used as accessCreds
    # command: string to execute over ssh
    # output: ($out)
    sub remoteCommand {
    my %args = @_;
    &assertUsage("remoteCommand: Error, usage: hash with keys: accessCreds command", $args{accessCreds}, $args{command});

    my $cmd = "ssh -p$args{accessCreds}->{port} $args{accessCreds}->{user}\@$args{accessCreds}->{host} '$args{command}'";
    my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
    return ($out);
    }

    # Copy local file to remote host
    # Recursively copy dirs
    #
    # input: (%args) keys: src dest destAccessCreds
    # src/dest: files or dirs
    # destAccessCreds: hashref, keys: port user host
    # output: ($rc) 0 success, non-zero failure
    sub remoteCopy {
    my %args = @_;

    my $cmd = "scp -r -P$args{destAccessCreds}->{port} $args{src} $args{destAccessCreds}->{user}\@$args{destAccessCreds}->{host}:$args{dest}";
    my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it.
    return ($rc);
    }