I have a producer-consumer scenario¹ based on a bounded Channel<T>
.
Channel<Item> channel = Channel.CreateBounded<Item>(10);
The items are coming from an RDBMS, to which the producer connects and fetches them one by one. The peculiarity is that I don't have the luxury of keeping the DB connection alive for the whole lifetime of the producer. I have to close the connection when the channel is full, and reopen the connection when the channel has again space available for a new item. So I implemented the producer like this:
// Producer
while (true)
{
await channel.Writer.WaitToWriteAsync();
connection.Open();
Item item;
while (true)
{
item = connection.GetNextItem(); // This is fast
if (!channel.Writer.TryWrite(item)) break;
}
connection.Close();
await channel.Writer.WriteAsync(item);
}
The producer waits until the channel.Writer.WaitToWriteAsync()
task completes, then opens the DB connection, writes in the channel as many items as it can until one is rejected, closes the DB connection, writes asynchronously the rejected item, and loops back to the waiting.
The consumer is pretty standard:
// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
// Process the item (this is slow)
}
My problem with this design is that the DB connection is opened and closed too often. Both opening and closing the connection has a non-negligible overhead, so I would like to minimize how frequently it happens. Although the capacity of the channel is 10, I would prefer if the WaitToWriteAsync
task was completing when the channel is half-full (5 items), not immediately when the stored items drop from 10 to 9.
My question is: How can I modify my producer, so that it connects to the database when there are 5 or less items in the channel, and closes the connection when the channel is full with 10 items?
Below is the output from a minimal example that I wrote, that reproduces the undesirable behavior:
19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished
As you can see there is a lot of "Opening/Closing connection" going on.
My question has similarities with this older question:
Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full.
The difference is that in my case the producer is a loop, and not the event handler of a service, as in the other question.
¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.
Clarification: The channel should not be drained completely before reconnecting to the DB. That's because opening the connection takes some time, and I don't want the consumer to be idle during this time. So the producer should reconnect when the channel has dropped to 5 items or less, not when it is completely empty.
One way to solve this problem is to use two channels, a bounded
Channel<T>
with the desirable capacity, and a second boundedChannel<int>
with capacity 1 that is used only for theWaitToWriteAsync
functionality. Synchronizing the two channels is not trivial, so I wrote a customChannel<T>
implementation that wraps these two channels, and does the synchronization internally:The
lowCapacity
affects only theWaitToWriteAsync
method.The
DoubleCapacityChannel<T>
class can solve the problem in the question by changing the line:to
Below is the output from the original minimal example, modified to use the
DoubleCapacityChannel<T>
: