Why is this Lwt based and seemingly concurrent code so inconsistent

1.5k Views Asked by At

I am trying to create concurrent examples of Lwt and came up with this little sample

let () =
  Lwt_main.run (
      let start = Unix.time () in
      Lwt_io.open_file Lwt_io.Input "/dev/urandom" >>= fun data_source ->
      Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
      Lwt_list.iter_p
        (fun count ->
         let count = string_of_int count in
         Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= fun h ->
         Lwt_io.read ~count:52428800
                     data_source >>= Lwt_io.write_line h)
        [0;1;2;3;4;5;6;7;8;9] >>= fun () ->
      let finished = Unix.time () in
      Lwt_io.printlf "Execution time took %f seconds" (finished -. start))

EDIT: With asking for 50GB it was: "However this is incredibly slow and basically useless. Does the inner bind need to be forced somehow?"

EDIT: I originally had written asking for 50 GB and it never finished, now I have a different problem with asking for 50MB, The execution is nearly instantaneously and du -sh reports only a directory size of 80k.

EDIT: I have also tried the code with explicitly closing the file handles with the same bad result.

I am on OS X latest version and compile with

ocamlfind ocamlopt -package lwt.unix main.ml -linkpkg -o Test

(I have also tried /dev/random, yes I'm using wall-clock time.)

2

There are 2 best solutions below

4
On BEST ANSWER

So, your code has some issues.

Issue 1

The main issue is that you understood the Lwt_io.read function incorrectly (and nobody can blame you!).

val read : ?count : int -> input_channel -> string Lwt.t
  (** [read ?count ic] reads at most [len] characters from [ic]. It
      returns [""] if the end of input is reached. If [count] is not
      specified, it reads all bytes until the end of input. *)

When ~count:len is specified it will read at most len characters. At most, means, that it can read less. But if the count option is omitted, then it will read all data. I, personally, find this behavior unintuitive, if not weird. So, this at most means up to len or less, i.e., no guarantee is provided that it will read exactly len bytes. And indeed, if you add a check into your program:

 Lwt_io.read ~count:52428800 data_source >>= fun data ->
 Lwt_io.printlf "Read %d bytes" (String.length data) >>= fun () ->
 Lwt_io.write h data >>= fun () ->

You will see, that it will read only 4096 bytes, per try:

Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes

Why 4096? Because this is the default buffer size. But it actually doesn't matter.

Issue 2

Lwt_io module implements a buffered IO. That means that all your writes and reads are not going directly to a file, but are buffered in the memory. That means, that you should remember to flush and close. Your code doesn't close descriptors on finish, so you can end up with a situation when some buffers are left unflushed after a program is terminated. Lwt_io in particular, flushes all buffers before program exit. But you shouldn't rely on this undocumented feature (it may hit you in future, when you will try any other buffered io, like fstreams from standard C library). So, always close your files (another problem is that today file descriptors are the most precious resource, and their leaking is very hard to find).

Issue 3

Don't use /dev/urandom or /dev/random to measure io. For the former you will measure the performance of random number generator, for the latter you will measure the flow of entropy in your machine. Both are quite slow. Depending on the speed of your CPU, you will rarely get more than 16 Mb/s, and it is much less, then Lwt can throughput. Reading from /dev/zero and writing to /dev/null will actually perform all transfers in memory and will show the actual speed, that can be achieved by your program. A well-written program will be still bounded by the kernel speed. In the example program, provided below, this will show an average speed of 700 MB/s.

Issue 4

Don't use the buffered io, if you're really striving for a performance. You will never get the maximum. For example, Lwt_io.read will read first at buffer, then it will create a string and copy data to that string. If you really need some performance, then you should provide your own buffering. In most cases, there is no need for this, as Lwt_io is quite performant. But if you need to process dozens of megabytes per second, or need some special buffering policy (something non-linear), you may need to think about providing your own buffering. The good news is that Lwt_io allows you to do this. You can take a look at an example program, that will measure the performance of Lwt input/output. It mimics a well-known pv program.

Issue 5

You're expecting to get some performance by running threads in parallel. The problem is that in your test there is no place for the concurrency. /dev/random (as well as /dev/zero) is one device that is bounded only by CPU. This is the same, as just calling a random function. It will always be available, so no system call will block on it. Writing to a regular file is also not a good place for concurrency. First of all, usually there is only one hard-drive, with one writing head in it. Even if system call will block and yield control to another thread, this will result in a performance digression, as two threads will now compete for the header position. If you have SSD, there will not be any competition for the header, but the performance will be still worse, as you will spoil your caches. But fortunately, usually writing on regular files doesn't block. So your threads will run consequently, i.e., they will be serialized.

9
On

If you look at your files, you'll see they're each 4097K – that's 4096K that was read from /dev/urandom, plus one byte for the newline. You're reaching a buffer maximum with Lwt_io.read, so even though you say ~count:awholelot, it only gives you ~count:4096.

I don't know what the canonical Lwt way to do this is, but here's one alternative:

open Lwt

let stream_a_little source n = 
    let left = ref n in
    Lwt_stream.from (fun () -> 
        if !left <= 0 then return None
        else Lwt_io.read ~count:!left source >>= (fun s -> 
            left:=!left - (Bytes.length s);
            return (Some s)
        ))

let main () =
    Lwt_io.open_file ~buffer_size:(4096*8) ~mode:Lwt_io.Input "/dev/urandom" >>= fun data_source ->
        Lwt_unix.mkdir "serial" 0o777 >>= fun () ->
            Lwt_list.iter_p
        (fun count ->
            let count = string_of_int count in
            Lwt_io.open_file
           ~flags:[Unix.O_RDWR; Unix.O_CREAT]
           ~perm:0o777
           ~mode:Lwt_io.Output ("serial/file"^ count ^ ".txt") >>= (fun h ->
               Lwt_stream.iter_s (Lwt_io.write h)
               (stream_a_little data_source 52428800)))
        [0;1;2;3;4;5;6;7;8;9]

let timeit f =
        let start = Unix.time () in
        f () >>= fun () ->
            let finished = Unix.time () in
            Lwt_io.printlf "Execution time took %f seconds" (finished -. start)

let () =
    Lwt_main.run (timeit main)

EDIT: Note that lwt is a cooperative threading library; when you have two threads going "at the same time", they don't actually do stuff in your OCaml process at the same time. OCaml is (as of yet) single-core, so when one thread is moving, the others wait nicely until that thread says "OK, I've done some work, you others go". So when you try to stream to 8 files at the same time, you're basically doling out a little randomness to file1, then a little to file2, … a little to file8, then (if there's still work left to do) a little to file1, then a little to file2 etc. This makes sense if you're waiting on lots of input anyway (say your input is coming over the network), and your main process has a lot of time to go through each thread and check "is there any input?", but when all your threads are just reading from /dev/random, it'd be much faster to just fill up one file first, then the second, etc. And assuming it's possible for several CPU's to read /dev/(u)random in parallell (and your drive can keep up), it'd of course be much faster to load ncpu reads at the same time, but then you need multicore (or just do this in shell script).

EDIT2: showed how to increase buffer size on the reader, ups the speed a bit ;) Note that you can also simply set the buffer_size as high as you want on your old example, which will read it all in one go, but you can't get more than your buffer_size unless you read several times.