I'm trying to split an incoming Akka stream of bytes (from the body of an http request, but it could also be from a file) into multiple files of a defined size.
For example, if I'm uploading a 10Gb file, it would create something like 10 files of 1Gb. The files would have randomly generated names. My issue is that I don't really know where to start, because all the responses and examples I've read are either storing the whole chunk into memory, or using a delimiter based on a string. Except I can't really have "chunks" of 1Gb, and then just write them to the disk..
Is there any easy way to perform that kind of operation ? My only idea would be to use something like this http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings but transformed to something like FlowShape[ByteString, File]
, writting myself into a file the chunks until the max file size is reached, then creating a new file, etc.., and streaming back the created files. Which looks like an atrocious idea not using properly Akka..
Thanks in advance
I often revert to purely functional, non-akka, techniques for problems such as this and then "lift" those functions into akka constructs. By this I mean I try to use only scala "stuff" and then try to wrap that stuff inside of akka later on...
File Creation
Starting with the
FileOutputStream
creation based on "randomly generated names":State
There needs to be some way of storing the "state" of the current file, e.g. the number of bytes already written:
File Writing
First we determine if we'd breach the maximum file size threshold with the given
ByteString
:We then have to write the function that creates a new
FileState
if we've maxed out the capacity of the current one or returns the current state if it is still below capacity:The only thing left is to write to the
FileOutputStream
:Fold On Any GenTraversableOnce
In scala a
GenTraversableOnce
is any collection, parallel or not, that has the fold operator. These include Iterator, Vector, Array, Seq, scala stream, ... Th finalwriteToChunkedFile
function perfectly matches the signature of GenTraversableOnce#fold:One final loose end; the last
FileOutputStream
needs to be closed as well. Since the fold will only emit that lastFileState
we can close that one:Akka Streams
Akka Flow gets its
fold
from FlowOps#fold which happens to match theGenTraversableOnce
signature. Therefore we can "lift" our regular functions into stream values similar to the way we usedIterable
fold:The nice part about handling the problem with regular functions is that they can be used within other asynchronous frameworks beyond streams, e.g. Futures or Actors. You also don't need an akka
ActorSystem
in unit testing, just regular language data structures.You can then use this
Sink
to drainHttpEntity
coming fromHttpRequest
.