I want to download files as fast as possible with python.Here is my code
import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 17 # No of threads to spawn
__chunk_size = 1024
__download_time = -1
__errors = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
try:
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return
else:
# No exception whatsoever
for chunk in my_csv:
AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()
def set_user_agent(self, useragent):
self.useragent = useragent
AsyncDownloader.__user_agent = self.useragent
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__connection_timeout = self.timeout_secs
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__read_timeout = self.timeout_secs
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
AsyncDownloader.__download_count = self.file_count
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
AsyncDownloader.__worker_count = self.worker_count
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
AsyncDownloader.__chunk_size = self.chunk_size
def print_urls(self):
print(AsyncDownloader.__urls)
def get_download_time(self):
return AsyncDownloader.__download_time
def get_errors(self):
return AsyncDownloader.__errors
def download(self):
start = timer()
try:
session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
session.headers.update({'user-agent': AsyncDownloader.__user_agent})
session.request(AsyncDownloader.__connection_timeout,
AsyncDownloader.__connection_timeout, stream=True)
results = []
# Give an accurate file count even if we don't have to download it as it a;ready exist
file_count = 0
for url in AsyncDownloader.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if AsyncDownloader.__download_count != 0:
# No need to download file if it already exist
if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
file_count += 1
continue
else:
if file_count < AsyncDownloader.__download_count:
file_count += 1
results.append(session.get(url))
else:
if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
results.append(session.get(url))
for result in results:
# wait for the response to complete, if it hasn't already
response = result.result()
filename = os.path.basename(response.url)
if response.status_code == 200:
with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
if chunk: # filter out keep-alive new chunks
fd.write(chunk)
end = timer()
AsyncDownloader.__download_time = end - start
except requests.exceptions.HTTPError as errh:
AsyncDownloader.__errors.append("Http Error:" + errh)
# print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
AsyncDownloader.__errors.append("Error Connecting:" + errc)
# print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
AsyncDownloader.__errors.append("Timeout Error:" + errt)
# print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
AsyncDownloader.__errors.append("OOps: Something Else" + err)
else:
return
The following code is making a very bad assumption.Indeed i am assuming that the first url will finish first which is of course not correct.
# wait for the response to complete, if it hasn't already
response = result.result()
How can i ensure that only requests that have been completed are processed instead of taking an assumption like above in an efficient manner ?
I would appreciate any other suggestion on how to improve performance.
Kind Regards
Even if the connections were completed in order, you are still processing the files sequentially. The second file must wait for the first one to be written and so on. So, the best thing you can do is processing everything in parallel (this can be done despite the GIL, since io operations like writing to disk and reading from the network will release it). Basically, use regular
requests
library (notrequests-futures
) and create a future/thread per request + file-processing.There are still more ways to make it faster, like keep downloading chunks while writing (i.e two threads, one for request and one for file-processing). And reading chunks in parallel by making
multi-part
requests, which is "download accelerator" territory and you may not want that kind of complexity in your code.Edit: Also, chunked downloads are lazy, which means you are only making the initial requests in parallel, but the actual chunked-file download is being made sequentially, since it's being done in the main thread. So, your current approach is not much better than a fully synchronous one. The above advice still stands.