Skip to content

Instantly share code, notes, and snippets.

@ericychoi
Last active May 30, 2017 19:59
Show Gist options
  • Save ericychoi/a21abe15cab3f3b7b9b79fc5b6575a61 to your computer and use it in GitHub Desktop.
Save ericychoi/a21abe15cab3f3b7b9b79fc5b6575a61 to your computer and use it in GitHub Desktop.
An attempt to implement Async waterfall in AnyEvent perl
package Async;
use warnings;
use strict;
use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
@ISA = qw(Exporter);
@EXPORT_OK = qw(waterfall);
# waterfall: executes mutliple subroutines in series, passing result from previous run to the next.
# usage:
# Async::waterfall(
# subs => [
# sub { %args = @_; %args->{cb}->( {arg1 => 1} ); },
# sub { %args = @_; my $two = $args{arg1}+1; %args->{cb}->( {arg3 => $two} ); },
# sub { %args = @_; my $four = $args{arg3}*2; %args->{cb}->( {final_arg => $four} ); },
# ],
# cb => sub {
# %args = @_;
# print $args{final_arg}; # will print 4;
# }
# );
# a sub is sub( arg1 => ..., arg2 => ..., cb => sub { ... } )
# cb is expected to get called in this form: cb->({%args_for_next_sub}, $err)
# normally, err is checked before executing the next sub with sub->(%args_for_next_sub).
# cb supplied by prev sub is ignored (since we have to call the next sub)
# if err is truthy, then we call finalizer->(undef, $err)
# if ignore_err is set, we ignore errors and carry on, the errored sub will pass no arg to the next sub (except cb)
sub waterfall {
my %args = @_;
my $finalizer = delete $args{cb};
my $ignore_err = delete $args{ignore_err};
my @subs_left = @{delete $args{subs}};
_waterfall(
sub_args => \%args,
subs_left => \@subs_left,
finalizer => $finalizer,
ignore_err => $ignore_err,
);
}
sub _waterfall {
my %args = @_;
if (@{$args{subs_left}} > 0) {
my $q = shift @{$args{subs_left}};
$q->(
%{$args{sub_args}},
cb => sub {
my ($next_args, $err) = @_;
if (!$args{ignore_err} && $err) {
return $args{finalizer}->(undef, $err);
}
_waterfall(%args, sub_args => $next_args);
},
);
}
else {
my $final_args = $args{sub_args} // 1;
$args{finalizer}->($final_args, undef);
}
}
1;
package Async::Test;
use lib('/usr/local/sendlib/t/lib');
use warnings;
use strict;
use base qw(Test::Class);
use AnyEvent;
use Test::More;
use Test::Deep;
use Data::Dumper;
use Async;
sub async_waterfall_success : Test(no_plan) {
my $self = shift;
my $cv = AnyEvent->condvar;
my (@called, @sub_args, @subs, $finalizer_called);
my $sub_count = 10;
for my $i (0..$sub_count-1) {
my $s = sub {
my %args = @_;
$sub_args[$i] = { %args };
$called[$i]++;
$args{cb}->( { "arg$i" => $i}, undef );
};
push @subs, $s;
}
my %original_args = 0..9; # dummy args;
my @finalizer_args;
my $finalizer = sub {
@finalizer_args = @_;
$finalizer_called++;
$cv->send;
};
Async::waterfall(%original_args, subs => \@subs, cb => $finalizer);
$cv->recv;
cmp_deeply(\@called, [ (1)x$sub_count ] , 'called every queuer once');
is($finalizer_called, 1, 'finalizer called');
cmp_deeply(\@finalizer_args, , [ { arg9 => 9 }, undef ], 'with the right args');
# first one contains original_args
my %a = %{$sub_args[0]};
delete $a{cb};
cmp_deeply(\%a, \%original_args, '1st sub args match');
for my $i (1..$sub_count-1) {
my $prev_i = $i-1;
my %a = %{$sub_args[$i]};
delete $a{cb};
cmp_deeply(\%a, { "arg$prev_i" => $prev_i }, 'args match');
}
}
sub async_waterfall_error : Test(no_plan) {
my $self = shift;
my $cv = AnyEvent->condvar;
my (@called, @sub_args, @subs, $finalizer_called);
my $sub_count = 10;
my $error_sub_index = 2; # 3rd queuer will error
for my $i (0..$sub_count-1) {
my $s = sub {
my %args = @_;
$called[$i]++;
$sub_args[$i] = { %args };
if ($error_sub_index == $i) {
return $args{cb}->(undef, 'error');
}
$args{cb}->( {"arg$i" => $i}, undef );
};
push @subs, $s;
}
my %original_args = 0..9; # dummy args;
my @finalizer_args;
my $finalizer = sub {
@finalizer_args = @_;
$finalizer_called++;
$cv->send;
};
Async::waterfall(%original_args, subs => \@subs, cb => $finalizer);
$cv->recv;
cmp_deeply(\@called, [ (1)x($error_sub_index+1) ] , 'called every sub once until failure');
is($finalizer_called, 1, 'finalizer called');
cmp_deeply(\@finalizer_args, , [undef, 'error'], 'with the right args');
# first one contains original_args
my %a = %{$sub_args[0]};
delete $a{cb};
cmp_deeply(\%a, \%original_args, '1st sub args match');
for my $i (1..$sub_count-1) {
my $prev_i = $i-1;
if ($i > $error_sub_index) {
is($sub_args[$i], undef, 'nothing called beyond the errored sub');
next;
}
my %a = %{$sub_args[$i]};
delete $a{cb};
cmp_deeply(\%a, { "arg$prev_i" => $prev_i }, 'args match');
}
## repeat with ignore_err
(@called, @sub_args, $finalizer_called) = ((), (), 0);
Async::waterfall(%original_args, subs => \@subs, ignore_err => 1, cb => $finalizer);
$cv->recv;
cmp_deeply(\@called, [ (1)x$sub_count ] , 'called every queuer once');
is($finalizer_called, 1, 'finalizer called');
cmp_deeply(\@finalizer_args, , [ { arg9 => 9 }, undef ], 'with the right args');
# first one contains original_args
%a = %{$sub_args[0]};
delete $a{cb};
cmp_deeply(\%a, \%original_args, '1st sub args match');
for my $i (1..$sub_count-1) {
my $prev_i = $i-1;
my %a = %{$sub_args[$i]};
delete $a{cb};
if ($i == $error_sub_index+1) {
cmp_deeply(\%a, {}, 'on the errored sub');
next;
}
cmp_deeply(\%a, { "arg$prev_i" => $prev_i }, 'args match');
}
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment