I use epam's FIX engine in an application to communicate via FIX. The method via which the incoming FIX messages arrive is an inherited void OnNewMessage(FixMessage msg) method. From there I want to consume the msg, offloading to a worker (threadpool?) thread to do CPU bound calculations. That is after doing a quick msg sorting check or two to see if an incoming msg is of interest or not.
Taking into account, as mentioned on the github repo Note: The IFixSessionListener.OnNewMessage() method must not be dead-locked or perform time-demanding operations because it will lock all message processing for this session. A user should execute time consuming handling operations in separate threads and return control as soon as possible.. Therefore I want to offload as fast as possible.
I get the error or problem that Tag 268 does not exist, which in reality it does exists because I can see it in the FIX messages when looking at the incoming FIX message log.
I started testing with ActionBlock and Channel to try and fix this loss of data integrity which I presume happens due to threading (Task.Run). As I understand, ActionBlock and Channel is there for built in thread safety, parallelism or concurrency.
Channel code:
private readonly Channel<FixMessagePlusID> _fixChannel;
private readonly ChannelReader<FixMessagePlusID> reader;
private readonly ChannelWriter<FixMessagePlusID> writer;
private readonly CancellationTokenSource _ctSource;
private readonly CancellationToken _token;
_ctSource = new CancellationTokenSource();
_token = _ctSource.Token;
_fixChannel = Channel.CreateUnbounded<FixMessagePlusID>();
reader = _fixChannel.Reader;
writer = _fixChannel.Writer;
int id = 0;
Task consumerOne = StartConsumingAsync();
Task consumerTwo = StartConsumingAsync();
public async void OnNewMessage(FixMessage fixMessage)
{
int repeating = fixMessage.GetTagAsInt(268);
Console.WriteLine("Tag 268 capture as: " + repeating.ToString() + " MsgID: " + id.ToString());
if(StartUsingData)
{
await writer.WriteAsync(new FixMessagePlusID(fixMessage,id), _token);
id++;
if(id > 10){ StartUsingData = false; }
}
}
public async Task StartConsumingAsync()
{
await foreach (var fixMessagePlusID in reader.ReadAllAsync(_token))
{
FixMessage fixMessage = fixMessagePlusID.FixMessage;
if(fixMessage.IsTagExists(268))
{
int repeating = fixMessage.GetTagAsInt(268);
if(repeating == 1)
{
string symbol = fixMessage.GetTagValueAsString(55);
if(IWantToUseThisMessage)
{
// Do long running CPU bound task:
DoWork());
}
}
}
else{Console.WriteLine("Msg Tag268 FAIL, MsgID: " + fixMessagePlusID.ID);}
}
}
EDIT:
See the updated code example and console result screenshot below. Tag 268 IS found before it is written to the Channel.
