Perl Mojolicious: limit concurrency with all_settled

113 Views Asked by At

In Mojolicious for Perl, is there a way to limit concurrency when using Promise->all_settled?

In the example below, I would like to limit concurrency=>10. I'm using sleep to simulate a blocking operation:

use Mojolicious::Lite -signatures, -async_await;

helper isOdd_p => sub($self, $number)
{
    return Mojo::Promise->new(sub($resolve, $reject ) {
        Mojo::IOLoop->subprocess(
            sub {
                sleep 1;
                $number % 2;
            },
            sub ($subprocess, $err, @res ) {
                $reject->( $err ) if $err;
                $reject->( @res ) if @res && $res[0]==0; # reject Even
                $resolve->( @res );
            })
            
        });
};

any '/' => async sub ($c) {
    $c->render_later();
    my @promises = map { $c->isOdd_p($_) } (0..50);
    my @results = eval { await Mojo::Promise->all_settled(@promises) };
    #my @results = eval { await Mojo::Promise->map({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) ) };
    if (my $err = $@) {
        $c->render(json => $@); # will this line be ever reached with all_settled??
    } else {
        $c->render(json => [ @results ] );
    }
};

app->start;
1

There are 1 best solutions below

2
ikegami On BEST ANSWER

Currently, you start all 51 tasks, then wait for all 51 tasks to finish.

Limiting concurrency to 10 would mean starting 10 tasks, waiting for one to finish, starting one, waiting for one to finish, etc.

Three tiny changes to map suffices. (Changed two mentions of all to all_settled, and changed the rejection handler to keep going. Also changed it from a M::P method to a sub.)

sub map_all_settled {
  my ($class, $options, $cb, @items) = ('Mojo::Promise', ref $_[0] eq 'HASH' ? shift : {}, @_);
 
  return $class->all_settled(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency};
 
  my @start = map { $_->$cb } splice @items, 0, $options->{concurrency};
  my @wait  = map { $start[0]->clone } 0 .. $#items;
 
  my $start_next = sub {
    return () unless my $item = shift @items;
    my ($start_next, $chain) = (__SUB__, shift @wait);
    $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); $start_next->() }) for $item;
    return ();
  };
 
  $_->then($start_next, sub { }) for @start;
 
  return $class->all_settled(@start, @wait);
}
my @results = await map_all_settled({concurrency=>10}, sub { $c->isOdd_p($_)}, (0..50) );