Python: use asyncio to hit api and output .csv's

195 Views Asked by At

I'm trying to think of how to rewrite some code asynchroniously. I have to download ~7500 datasets from an api and write them to .csv's. Here is a reproducible example (assuming you have a free api key for alpha vantage):

from alpha_vantage.timeseries import TimeSeries
import pandas as pd
import numpy as np
api_key = ""

def get_ts(symbol):
    
    ts = TimeSeries(key=api_key, output_format='pandas')
    data, meta_data = ts.get_daily_adjusted(symbol=symbol, outputsize='full')
    fname = "./data_dump/{}_data.csv".format(symbol)
    data.to_csv(fname)

symbols = ['AAPL', 'GOOG', 'TSLA', 'MSFT']

for s in symbols:
    get_ts(s)

The people who made the alpha_vantage API wrote an article on using it with asyncio here, but I'm not sure if I should make two functions for pulling the data and writing the csv like here.

I haven't used asyncio before, so any pointers would be greatly appreciated - just looking to make my download time take less than 3 hours if possible!

Edit: The other caveat is I'm helping a researcher with this so we are using Jupyter notebooks - see their caveat for asyncio here.

1

There are 1 best solutions below

2
On BEST ANSWER

Without changing your function get_ts, it might look like this:

import multiprocessing

# PROCESSES = multiprocessing.cpu_count()
PROCESSES = 4  # number of parallel process
CHUNKS = 6  # one process handle n symbols

# 7.5k symbols
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
           "XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
           "TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
           "AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
           "MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
           "CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]

# create a list of n sublist
TICKERS = [TICKERS[i:i + CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]


def download_data(pool_id, symbols):
    for symbol in symbols:
        print("[{:02}]: {}".format(pool_id, symbol))
        # do stuff here
        # get_ts(symbol)


if __name__ == "__main__":
    with multiprocessing.Pool(PROCESSES) as pool:
        pool.starmap(download_data, enumerate(TICKERS, start=1))

Similar question here.

In this example, I split the list of tickers into sublists for each process retrieves data for multiple symbols and limits overhead due to create and destroy processes.