loss of data integrity when consuming incoming FIX market messages using ActionBlock and/or Channel?

92 Views Asked by At

I use epam's FIX engine in an application to communicate via FIX. The method via which the incoming FIX messages arrive is an inherited void OnNewMessage(FixMessage msg) method. From there I want to consume the msg, offloading to a worker (threadpool?) thread to do CPU bound calculations. That is after doing a quick msg sorting check or two to see if an incoming msg is of interest or not.

Taking into account, as mentioned on the github repo Note: The IFixSessionListener.OnNewMessage() method must not be dead-locked or perform time-demanding operations because it will lock all message processing for this session. A user should execute time consuming handling operations in separate threads and return control as soon as possible.. Therefore I want to offload as fast as possible.

I get the error or problem that Tag 268 does not exist, which in reality it does exists because I can see it in the FIX messages when looking at the incoming FIX message log.

I started testing with ActionBlock and Channel to try and fix this loss of data integrity which I presume happens due to threading (Task.Run). As I understand, ActionBlock and Channel is there for built in thread safety, parallelism or concurrency.

Channel code:

private readonly Channel<FixMessagePlusID> _fixChannel;
private readonly ChannelReader<FixMessagePlusID> reader;
private readonly ChannelWriter<FixMessagePlusID> writer;
private readonly CancellationTokenSource _ctSource;
private readonly CancellationToken _token;

_ctSource =  new CancellationTokenSource();
_token = _ctSource.Token;
_fixChannel = Channel.CreateUnbounded<FixMessagePlusID>();
reader = _fixChannel.Reader;
writer =  _fixChannel.Writer;
int id = 0;

Task consumerOne =  StartConsumingAsync();
Task consumerTwo =  StartConsumingAsync();

public async void OnNewMessage(FixMessage fixMessage)
{
    int repeating = fixMessage.GetTagAsInt(268);
    Console.WriteLine("Tag 268 capture as: " + repeating.ToString() + " MsgID: "  + id.ToString());
    if(StartUsingData)
    {               
        await writer.WriteAsync(new FixMessagePlusID(fixMessage,id), _token);
        id++;
        if(id > 10){ StartUsingData = false; }  
    }
}
public async Task StartConsumingAsync()
{
    await foreach (var fixMessagePlusID in reader.ReadAllAsync(_token))
    {
        FixMessage fixMessage = fixMessagePlusID.FixMessage;
        if(fixMessage.IsTagExists(268))
        {
            int repeating = fixMessage.GetTagAsInt(268);
            if(repeating == 1)
            {                               
                string symbol = fixMessage.GetTagValueAsString(55);
                if(IWantToUseThisMessage)
                {
                    // Do long running CPU bound task:
                    DoWork());
                }
            }   
        }
        else{Console.WriteLine("Msg Tag268 FAIL, MsgID: " + fixMessagePlusID.ID);}
    }
}

EDIT:

See the updated code example and console result screenshot below. Tag 268 IS found before it is written to the Channel.

enter image description here

0

There are 0 best solutions below