I need to construct following flow:
- Accept list of file names
- Extract multiple lines from those files
- Process those lines
However I have no idea how to properly inject gather-take into map:
sub MAIN ( *@file-names ) {
@file-names.map( { slip parse-file( $_ ) } ).map( { process-line( $_ ) } );
}
sub parse-file ( $file-name ) {
return gather for $file-name.IO.lines -> $line {
take $line if $line ~~ /a/; # dummy example logic
}
}
sub process-line ( $line ) {
say $line; # dummy example logic
}
This code works but leaks memory like crazy. I assume slip makes gather-take eager? Or slip does not mark Seq items as consumed? Is there a way to slip gather-take result into map in lazy manner?
BTW: My intent is to parallelize each step with race later - so for example I have 2 files parsed at the same time producing lines for 10 line processors. Generally speaking I'm trying to figure out easiest way of composing such cascade flows. I've tried Channels to connect each processing step but they have no built-in pushback. If you have any other patterns for such flows then comments are more than welcomed.
EDIT 1:
I think my code is correct, and memory leak is not caused by bad logic but rather by bug in Slip class. I've created issue https://github.com/rakudo/rakudo/issues/5138 that is currently open. I'll post an update once it is resolved.
EDIT 2: No, my code was not correct :) Check for my post for answer.
First of all - I had big misconception. I thought that all lines produced by
parse-filemust be slipped into mapblocklike this:And
Slipis aListthat tracks all elements. That is why I had big memory leak.The solution is to create
gather-takesequence insidemapbut consume it outsidemap:So now it is: