There is a JSON-RPC API, which I'm currently implementing. It can be tested here.
The problem is that if an incorrect DTO model is passed to SendAsync<TResponse>
, JsonSerializer.Deserialize
is going to throw a JsonException
, which is not handled by my code. I know I've got to use SetException
in some way, but I don't know how to do it, so here is the question. The exception message should be printed in the console as well.
public sealed class Client : IDisposable
{
...
private readonly ConcurrentDictionary<long, IResponseHandler> _handlers = new();
...
public Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Run(async () =>
{
await foreach (var message in _client.Start(cancellationToken))
{
using var response = JsonDocument.Parse(message);
try
{
var requestId = response.RootElement.GetProperty("id").GetInt32();
// TODO: Handle JsonException errors via `SetException`?
// TODO: Show error when incorrect input parameters are filled
if (_handlers.ContainsKey(requestId))
{
_handlers[requestId].SetResult(message);
_handlers.TryRemove(requestId, out _);
}
}
catch (KeyNotFoundException)
{
// My point is that a message should be processed only if it doesn't include `id`,
// because that means that the message is an actual web socket subscription.
_messageReceivedSubject.OnNext(message);
}
}
}, cancellationToken);
...
return Task.CompletedTask;
}
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params
};
//var tcs = new TaskCompletionSource<TResponse>();
//_requestManager.Add(request.Id, request, tcs);
var handler = new ResponseHandlerBase<TResponse>();
_handlers[request.Id] = handler;
var message = JsonSerializer.Serialize(request);
_ = _client.SendAsync(message);
return handler.Task;
//return tcs.Task;
}
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params).ConfigureAwait(false);
return response.Result;
}
...
private interface IResponseHandler
{
void SetResult(string payload);
}
private class ResponseHandlerBase<TRes> : IResponseHandler
{
private readonly TaskCompletionSource<TRes> _tcs = new();
public Task<TRes> Task => _tcs.Task;
public void SetResult(string payload)
{
var result = JsonSerializer.Deserialize(payload, typeof(TRes));
_tcs.SetResult((TRes) result);
}
}
}
Coincidentally, I did something very similar while live-coding a TCP/IP chat application last week.
Since in this case you already have an
IAsyncEnumerable<string>
- and since you can get messages other than responses - I recommend also exposing thatIAsyncEnumerable<string>
:You can change this to be Rx-based if you want (
_messageReceivedSubject.OnNext
), but I figure if you already haveIAsyncEnumerable<T>
, then you may as well keep the same abstraction.Then, you can parse and detect responses, passing along all other messages:
Then, you can define your collection of outstanding requests and handle messages that are for those requests:
Note the usage of
ConcurrentDictionary.TryRemove
, which is safer than accessing the value and then removing it.Now you can write your general
SendAsync
. As I note in my video, I prefer to split up the code that runs synchronously inSendAsync
and the code thatawait
s the response:I've removed the "handler" concept completely, since it was just providing the type for
JsonSerializer.Deserialize
. Also, by using a localasync
method, I can use theasync
state machine to propagate exceptions naturally.Then, your higher-level methods can be built on this:
So the final code ends up being:
You may also want to check out David Fowler's Project Bedrock, which may simplify this code quite a bit.