how to manage multi threading and batchReading in Ocelot Load Balancer with custom load balancer

95 Views Asked by At

I have a worker that sends Batchrequests to the api which is implemented my custom load balancer in ocelot,in this customLoadBalance , I want to distribute 70% of every batchRequests to HostAndPort1 and 30% to another one. final goal is to send 70% of total requests to FirstApi and 30% sends to SecondApi.The problem is that it does not distribute the way i want, it just work when i have just one thread.

This is Worker codes to sends requests to ocelotApi :

   public async ValueTask<long> DoReadAsync()
    {
        OnProcess = true;
        var count = 0;


        try
        {
            var sends = (await 
            _unitOfWork.SendRepository.Where(x =>
               x.sndPost == false, x =>
              new Send
              {
                  sndID = x.sndID,
                  SndBody = x.SndBody,
                  SndTo = x.SndTo,
                  sndFrom = x.sndFrom,
                  SndFarsi = x.SndFarsi,
                  sndMsgClass = x.sndMsgClass,
                  sndUDH = x.sndUDH
              }, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();

            count = sends.Count;

            if (count == 0)
            {
                await Task.Delay(_settings.RefreshMillisecond);
                OnProcess = false;
                return count;
            }
            var split = BaseConfig.Split(sends);

            var restTasks = split
                .Select(items => _restService.DoPostAsync(items.ToList()))
                .ToList();

            var updateTasks = new List<Task>();

            while (restTasks.Any())
            {
                var task = await Task.WhenAny(restTasks);
                //task.ThrowExceptionIfTaskIsFaulted();
                var item = await task.ConfigureAwait(false);
                if (item is { IsSuccessStatusCode: true })
                {
                    var content = await item.Content.ReadAsStringAsync();
                    var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
                    var itemsSends = items.Select(_mapper.Map<Send>).ToList();
                    if (itemsSends.Any())
                    {
                        var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
                        updateTasks.Add(updateTask);
                    }

                }

                restTasks.Remove(task);

            }

            await Task.WhenAll(updateTasks).ConfigureAwait(false);
            Completed = true;
            OnProcess = false;


        }
        catch (Exception ex)
        {
            _logger.LogError(ex.Message);
            OnProcess = false;
        }
        return count;

    }

In above code forexample we have Batch : 50000 , I split them to 10 tasks with 5000 requests and send them to the ocelotApi.

and this is my codes in ocelotapi , I have written a middleware like this :

public class BatchMiddleware : OcelotMiddleware
{
    private readonly RequestDelegate _next;
    private bool isRahyabRG = true;
    private int remainedBatch = 0;
    
    public BatchMiddleware(
        RequestDelegate next,
        IConfiguration configuration,
        IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())

    {
        _next = next;
    }

    public async Task Invoke(HttpContext httpContext)
    {
        var request = httpContext.Request;
        var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
        var batchRequestCount = batchRequests.Count;
        var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);

        if (isRahyabRG)
        {
            var rgRequests = batchRequests.Take(RGCount).ToList();
            var requestBody = JsonConvert.SerializeObject(rgRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = false;
            remainedBatch = batchRequestCount - RGCount;
            httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
        }
        else
        {
            var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
            var pmRequests = new List<RequestDto>();
            if (remainedBatchKey != null)
            {
                pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
            }
            var requestBody = JsonConvert.SerializeObject(pmRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = true;
        }
        
        await _next.Invoke(httpContext);

    }

and this is myCustomLoadBalancer :

  public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private readonly object _lock = new();

    private int _last;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();

        lock (_lock)
        {
            if (_last >= services.Count)
                _last = 0;

            var next = services[_last++];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

}

and this is ocelot.json :

 {
"Routes": [

    {
        "DownstreamPathTemplate": "/api/Forward",
        "DownstreamScheme": "http",
        "DownstreamHostAndPorts": [

            {
                
                "Host": "localhost",
                "Port": 51003
            },
           
            {
                
                "Host": "localhost",
                "Port": 32667

            }

        ],
        "UpstreamPathTemplate": "/",
        "UpstreamHttpMethod": [
            "POST"
        ],


        "LoadBalancerOptions": {
            "Type": "MyRoundRobin"

        }
       
    }
]

}

1

There are 1 best solutions below

6
Eldar On BEST ANSWER

The code you share is a sample from the Ocelot documentation and it is a basic round-robin load balancer. As I understand from the code, the ILoadBalancer instance is created for every request. So the fields that hold shared info (lock and _last) should be static. You can achieve a weighted round-robin with the help of random numbers, as the request numbers are increased the distribution will approximate the expected value (%70 - %30). You can do this :

public class MyRoundRobin: ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;
    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
        var whatToPick = Random.Shared.NextDouble() >= 0.7 ? 1 : 0; 
        // Beware that Random.Shared added .NET 6 onwards for thread safety
        var next = services[whatToPick];
        
        return new OkResponse<ServiceHostAndPort>(next.HostAndPort);        
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }
}

If you do not want to implement a solution that involves random numbers you can try the below solution:

public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private static readonly object _lock = new();
    private static ulong counter;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
       
        lock (_lock)
        {
            var mod = unchecked(counter++) % 100;
            var whatToPick = mod < 70 ? 0 : 1;
            var next = services[whatToPick];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}