Does ChannelReader<T>.ReadAllAsync
throw any exceptions when being canceled by a CancellationToken? It doesn't seem to be throwing OperationCanceledException/TaskCanceledException?
I know if these two methods were called in a fire and forget manner, i.e. _ = SendLoopAsync(); _ = ReceiveLoopAsync();
, it would've crashed the task with no displayed message/exception because they were not awaited, meaning that we're losing the exceptions.
I don't want it to crash that task without letting me know that it actually has crashed/been cancelled, which means I should probably wrap the whole SendLoopAsync in a try/catch instead of what's between ReadAllAsync's branches.
A small example representing its behavior will be appreciated.
var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);
var client = new ChannelWebSocket(clientWebSocket);
for (var i = 1; i <= 10; i++)
{
client.Output.TryWrite($"Item: {i}");
}
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI
Console.ReadLine();
public class ChannelExample
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public ChannelExample(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
public ChannelReader<string> Input => _input.Reader;
public ChannelWriter<string> Output => _output.Writer;
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Sending: {message}");
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
ValueWebSocketReceiveResult receiveResult;
do
{
receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
}
} while (!receiveResult.EndOfMessage);
}
}
}
I suspect that it would throw; of course, you can always test that, but - that is the general expected pattern in this scenario. So you would wrap it with a:
Alternatively: you could pass
CancellationToken.None
into the channel read API, and just use the writer's completion to signify exit (making sure that you call.Complete(...)
on the writer when exiting).That said:
ReadAllAsync
probably isn't the preferred API here, since you don't really need it asIAsyncEnumerable<T>
- so it may be preferable to use the native channel API, i.e.