Running parallel API calls in Python without editing an external library

1.4k Views Asked by At

I have been trying to make my code faster by running parallel processes with no luck. I am fetching weather data with an external library (https://github.com/pnuu/fmiopendata). Under the hood the library is simply using requests.get() for fetching data from the API. Any tips on how to proceed? I could surely edit the code of fmiopendata, but I would prefer a workaround and not having to refactor others code.

Here is some working code, which I would like to edit:

from fmiopendata.wfs import download_stored_query

def parseStartTime(ts, year):
    return str(year) + "-" + ts[0][0] + "-" + ts[0][1] + "T00:00:00Z"


def parseEndTime(ts, year):
    return str(year) + "-" + ts[1][0] + "-" + ts[1][1] + "T23:59:59Z"


def weatherWFS(lat, lon, start_time, end_time):
    # Downloading the observations form the WFS server. Using bbox and timestams for querying
    while True:
        try:
            obs = download_stored_query(
                "fmi::observations::weather::daily::multipointcoverage",
                args=["bbox="+str(lon - 1e-2)+","+str(lat - 1e-2)+","+str(lon + 1e-2)+","+str(lat + 1e-2),
                      "starttime=" + start_time,
                      "endtime=" + end_time])
            if obs.data == {}:
                return False
            else:
                return obs
        except:
            pass


def getWeatherData(lat, lon):

    StartYear, EndYear = 2011, 2021

    # Handling the data is suitable chunks. Array pairs represent the starting and ending
    # dates of the intervals in ["MM", "dd"] format
    intervals = [
        [["01", "01"], ["03", "31"]],
        [["04", "01"], ["06", "30"]],
        [["07", "01"], ["09", "30"]],
        [["10", "01"], ["12", "31"]]
    ]
    
    # Start and end timestamps are saved in an array

    queries = [[parseStartTime(intervals[i], year),
                parseEndTime(intervals[i], year)]
               for year in range(StartYear, EndYear + 1)
               for i in range(len(intervals))]

    for query in queries:
        
        # This is the request we need to run in parallel processing to save time
        # the obs-objects need to be saved somehow and merged afterwards
        obs = weatherWFS(lat, lon, query[0], query[1])
        
        """ INSERT MAGIC CODE HERE """



lat, lon = 62.6, 29.72
getWeatherData(lat, lon)

2

There are 2 best solutions below

0
On

You could try using multiprocessing.Pool. Replace your for query in queries: loop with something like:

import multiprocessing

iterable = zip([lat]*len(queries), [lon]*len(queries), queries)
pool = multiprocessing.Pool(len(queries))
obs_list = pool.map(func=weatherWFS, iterable=iterable)
pool.close()
pool.join()

Note that this will pass the whole query elements as arguments to weatherWFS so you should change the function signature accordingly:

def weatherWFS(lat, lon, query):
    start_time = query[0]
    end_time = query[1]

Depending on the length of queries and its element you might also choose to unpack queries in your iterable...

0
On

Answering to my self:

The best solution I found so far is to use concurrent.futures with either the map() or submit() functions.

The suggested solution by Trambi does not improve the execution, as the requests are not CPU intensive. The bottleneck here is the waiting time, which the CPU has to stay idle, and therefore using separate processes is not going to solve the problem. However, multithreading can improve the speed, as the threads are created and shut down quicker.

Using the ThreadPoolExecutor with combination with as_completed(), I was able to recude the execution time with ~15%.

from concurrent.futures import ThreadPoolExecutor, as_completed
from fmiopendata.wfs import download_stored_query

def parseStartTime(ts, year):
    return str(year) + "-" + ts[0][0] + "-" + ts[0][1] + "T00:00:00Z"


def parseEndTime(ts, year):
    return str(year) + "-" + ts[1][0] + "-" + ts[1][1] + "T23:59:59Z"


def weatherWFS(lat, lon, start_time, end_time):
    # Downloading the observations form the WFS server. Using bbox and timestams for querying
    while True:
        try:
            obs = download_stored_query(
                "fmi::observations::weather::daily::multipointcoverage",
                args=["bbox="+str(lon - 1e-2)+","+str(lat - 1e-2)+","+str(lon + 1e-2)+","+str(lat + 1e-2),
                      "starttime=" + start_time,
                      "endtime=" + end_time])
            if obs.data == {}:
                return False
            else:
                return obs
        except:
            pass



def getWeatherData(lat, lon):

    StartYear, EndYear = 2011, 2021

    # Handling the data is suitable chunks. Array pairs represent the starting and ending
    # dates of the intervals in ["MM", "dd"] format
    intervals = [
        [["01", "01"], ["03", "31"]],
        [["04", "01"], ["06", "30"]],
        [["07", "01"], ["09", "30"]],
        [["10", "01"], ["12", "31"]]
    ]
    
    # Start and end timestamps are saved in an array

    queries = [
        [lat, lon,
        parseStartTime(intervals[i], year),
        parseEndTime(intervals[i], year)]
        for year in range(StartYear, EndYear)
        for i in range(len(intervals))]

    observations = [executor.submit(weatherWFS, query) for query in queries]

        for obs in as_completed(observations):
            obs = obs.result()
            """do stuff with the observations"""



lat, lon = 62.6, 29.72
getWeatherData(lat, lon)