Stream of millions of objects takes too much memory

236 Views Asked by At

I'm generating a load of coordinates (made of 3 numbers) within a geographical area. However, using Streams (which should be much more efficient than Lists), fills up the app's memory very quickly, as can be seen in this screenshot from Observatory.

Observatory Trace

I need a structure where events can go in, and be read out one by one, and when this happens, removed from the structure. As far as I understand, that is what a Stream is. When you add a value, the old one is removed.

Unfortunatley, this doesn't appear to be happening. Instead, the stream just grows larger and larger - or at least something reading it does, but I just run the .length method on the returned stream, and that's it.

Here's the function that starts the Isolate that returns the stream of coordinate tiles. I'll omit the actual generator, as it's not important: it just sends a Coord to the SendPort.

static Stream<Coords<num>> _generateTilesComputer(
    DownloadableRegion region,
  ) async* {
    List<List<double>> serialiseOutline(l) => (l as List)
        .cast<LatLng>()
        .map((e) => [e.latitude, e.latitude])
        .toList();

    final port = ReceivePort();

    final tilesCalc = await Isolate.spawn(
      region.type == RegionType.rectangle
          ? rectangleTiles
          : region.type == RegionType.circle
              ? circleTiles
              : lineTiles,
      {
        'port': port.sendPort,
        'rectOutline': region.type != RegionType.rectangle
            ? null
            : serialiseOutline(region.points),
        'circleOutline': region.type != RegionType.circle
            ? null
            : serialiseOutline(region.points),
        'lineOutline': region.type != RegionType.line
            ? null
            : (region.points as List<List<LatLng>>)
                .chunked(4)
                .map((e) => e.map(serialiseOutline)),
        'minZoom': region.minZoom,
        'maxZoom': region.maxZoom,
        'crs': region.crs,
        'tileSize': region.options.tileSize,
      },
    );

    await for (final Coords<num>? coord in port
        .skip(region.start)
        .take(((region.end ?? double.maxFinite) - region.start).toInt())
        .cast()) {
      if (coord == null) {
        port.close();
        tilesCalc.kill();
        return;
      }
      yield coord;
    }
  }
}

How can I prevent this memory leak? Happy to add more info if needed, but the full source code can be found at https://github.com/JaffaKetchup/flutter_map_tile_caching.

3

There are 3 best solutions below

0
JaffaKetchup On BEST ANSWER

Turns out, the issue was I was inadvertently storing the output of the stream. I'm not sure how/why/where, as the code in the question was designed to replace the code that stored these coords and outputted them as one List, but I guess I didn't get to the root cause.

For others trying to find the answer to this issue: it's not the Streams fault, you're probably storing the output of the stream.
For a useful pattern of 'multi-threaded' looping through results and processing them as they arrive, see this file from my library (may be outdated).

1
HoodedUnicorn On

Does this help a bit? It's your bottom bit.
The .batch method is used to read values in batches of 500, which can be changed to a different value if it is needed.
The count variable is used to keep track of the number of values processed, and when it reaches the limit, the port is closed and the isolate is killed.

    int count = 0;
    final limit = ((region.end ?? double.maxFinite) - region.start).toInt();
    await for (final Coords<num> coord in port
        .skip(region.start)
        .batch(500)) {
      if (count >= limit) {
        port.close();
        tilesCalc.kill();
        return;
      }
      count += coord.length;
      yield coord;
    }
  }
}
2
Shan On

To force the deletion of values from the stream when they are read out, you can implement a buffer using a StreamController and limit the number of values in the buffer. When the buffer reaches its limit, you can remove the first value in the buffer and add the next one. This will ensure that the memory usage stays under control.

Here's an example implementation:

static Stream<Coords<num>> _generateTilesComputer(
DownloadableRegion region,
) async* {

List<List<double>> serialiseOutline(l) => (l as List)
    .cast<LatLng>()
    .map((e) => [e.latitude, e.latitude])
    .toList();

final port = ReceivePort();
final controller = StreamController<Coords<num>>();

final tilesCalc = await Isolate.spawn(
  region.type == RegionType.rectangle
      ? rectangleTiles
      : region.type == RegionType.circle
          ? circleTiles
          : lineTiles,
  {
    'port': port.sendPort,
    'rectOutline': region.type != RegionType.rectangle
        ? null
        : serialiseOutline(region.points),
    'circleOutline': region.type != RegionType.circle
        ? null
        : serialiseOutline(region.points),
    'lineOutline': region.type != RegionType.line
        ? null
        : (region.points as List<List<LatLng>>)
            .chunked(4)
            .map((e) => e.map(serialiseOutline)),
    'minZoom': region.minZoom,
    'maxZoom': region.maxZoom,
    'crs': region.crs,
    'tileSize': region.options.tileSize,
  },
);

final bufferSize = 1000;
int count = 0;

port
    .skip(region.start)
    .take(((region.end ?? double.maxFinite) - region.start).toInt())
    .cast()
    .listen((Coords<num> coord) {
  if (coord == null) {
    controller.close();
    port.close();
    tilesCalc.kill();
    return;
  }

  if (count >= bufferSize) {
    controller.add(coord);
    controller.remove(0);
  } else {
    controller.add(coord);
    count++;
  }
});

yield* controller.stream;

}