Gunicorn over Flask sending Parallel requests to same route

70 Views Asked by At

I am running a Flask application (fewer users, organization level). I have a high-end machine, with 72 cores and 515703MB RAM.

I am currently running my application as (no nginx):

gunicorn --workers=10 --threads=12 -b 0.0.0.0:5000 "run:app" --access-logfile user_activity_logs/user_activity.log --preload

i.e. 120 process threads (Ideally, there is flexibility ~ 2*72 + 1 process threads).

There is a paginate API that needs to send huge data approx 60-70MB of rows from a CSV in small chunks of 1-2 MB.

For these, I am sending n (no. of pages) requests in parallel from Frontend. But on some dates, n is around 25-30 pages or even more.

From networks and htop, what I am seeing is initial 4 requests seem to be fast and parallel, but the later requests follow less speed and follow a pattern i.e. around 4 requests at a time in parallel and keep remaining pending.

Frontend Code:

const page_size = 20000;

async function fetchInitDetails() {
    return await fetchApi('/paginate-refdata/init', {
        "method": "GET",
        "data": {
            'date': $('#date').val(),
            'exchange': $('#exchange').val(),
            'page_size': page_size
        }
    })
}

async function fetchAllPages(initDetails) {
    let columns = [];
    for (let i = 0; i < show_columns.length; i++) {
        if (initDetails.columns.includes(show_columns[i])) {
            if(show_columns[i] == 'expiry_date') {
                columns.push({
                    headerName: titleCase(show_columns[i]),
                    field: show_columns[i],
                    filter: 'agTextColumnFilter',
                    minWidth: 150,
                    sort: 'desc', 
                    sortIndex: 0
                })
            } else {
                columns.push({
                    headerName: titleCase(show_columns[i]),
                    field: show_columns[i],
                    filter: 'agTextColumnFilter',
                    minWidth: 150,
                })
            }
        }
    }

    for (let i = 0; i < initDetails.columns.length; i++) {
        if (!show_columns.includes(initDetails.columns[i])) {
            columns.push({
                headerName: titleCase(initDetails.columns[i]),
                field: initDetails.columns[i],
                filter: 'agTextColumnFilter',
                minWidth: 150,
                hide: true
            })
        }
    }

    grid.ref_db.gridOptions.columnDefs = columns;

    const { total_pages } = initDetails;
    const parallelRequests = Array.from({ length: total_pages }, (_, index) => index + 1);
    let pages = 1;
    // (data.total - data.cnt) / 15000 * 10
    const responses = await Promise.all(parallelRequests.map(async (page) => {
        const response =  await fetchApi('/paginate-refdata', {
            "method": "GET",
            "data": {
                'date': $('#date').val(),
                'exchange': $('#exchange').val(),
                'page_size': page_size,
                'page': page
            }
        });
        show_loader(pages/initDetails.total_pages * 100, (initDetails.total_pages - pages++) * 1.2, rerender_html=false);
        // pages++;
        return response;
    }));

    return responses;
}

fetchInitDetails()
.then(initDetails => fetchAllPages(initDetails))
.then(responses => {
    let rows_data = []
    for(let i=0;i<responses.length;i++) {
        for(let j=0;j<responses[i]['data'].length;j++) {
            rows_data.push(responses[i]['data'][j]);
        }
    }
    grid['ref_db'].renderGrid(rows_data);
    Swal.fire({
        toast: true,
        position: 'bottom',
        icon: 'success',
        title: 'Data loaded',
        showConfirmButton: false,
        timer: 5000,
    });
})
.catch(error => {
    grid['ref_db'].renderGrid([]);
})
.finally(() => {
    onclose();
})

Backend Code:

@blueprint.route('/paginate-refdata')
@login_required
def paginate_refdata():
    try:
        date = request.args['date'].replace('-', '')
        exchange = request.args['exchange'].lower()
        page = int(request.args['page'])
        page_size = int(request.args.get('page_size', '10'))
    except:
        return jsonify({ 'msg': 'Please provide date, exchange & page', 'status': False }), 400

    refdata_df = get_or_load_refdata_from_cache(exchange, date)

    total_pages = int(len(refdata_df) / page_size) + 1
    curr_page = paginate_array(refdata_df, page_size, page).to_dict('records')
    result = {
        'page': page,
        'total_pages': total_pages,
        'page_size': len(curr_page),
        'total': len(refdata_df),
        'data': curr_page,
    }
    
    response = Response(gzip.compress(JSON.dumps(result).encode('utf-8')), content_type='application/json')
    response.headers['Content-Encoding'] = 'gzip'
    return response

I want to make this webserver utilise the resources and process all 30 requests in parallel as currently it is taking around 2-3 mins as using only 4 workers, it seems and I am expecting it to finish in <10secs, as 1 request takes around 3 secs of time.

Can anyone please help me and let me know where I am missing or wrong.

EDIT :

I tested with ab and it seems concurrent, as all processes now seem to have non 0 CPU% in htop

seq 1 300 | xargs -n 1 -P 300 -I {} ab -n 1 -c 1 "10.40.1.56:5000/…{}&page_size=10000"

but from browser actually 6 requests initially are in parallel in logs and then incrementally other requests come

0

There are 0 best solutions below