rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)

929 Views Asked by At

I am trying to accomplish the simple long running task by using redis queue but everytime I get timeout error even though I increase the time out value in job = q.enqueue(run_scraper, temp_file, job_timeout=16600) but no matter what it gives me time out error.

Traceback:

01:17:18 Traceback (most recent call last):
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
    coro_result = loop.run_until_complete(result)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
    self._run_once()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.9/selectors.py", line 469, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
    raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)
Traceback (most recent call last):
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/job.py", line 847, in _execute
    coro_result = loop.run_until_complete(result)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 634, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 601, in run_forever
    self._run_once()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1869, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.9/selectors.py", line 469, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
  File "/home/zerox/fp-google-search/venv/lib/python3.9/site-packages/rq/timeouts.py", line 63, in handle_death_penalty
    raise self._exception('Task exceeded maximum timeout value '
rq.timeouts.JobTimeoutException: Task exceeded maximum timeout value (16600 seconds)

FastAPI code:

import fastapi as _fastapi
from fastapi.responses import HTMLResponse, FileResponse, Response
from starlette.requests import Request
from starlette.templating import Jinja2Templates
import shutil
import os
import json

from rq import Queue
from rq.job import Job

from redis import Redis

from scraper import run_scraper
from utils import clean_file, csv_writer

app = _fastapi.FastAPI()

r = Redis(
    host="localhost",
    port=6379,
    db=0,
)
q = Queue(connection=r)

templates = Jinja2Templates("templates")


@app.get("/")
def index(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/v1/scraped_csv")
async def extract_ads(csv_file: _fastapi.UploadFile = _fastapi.File(...)):
    temp_file = _save_file_to_disk(csv_file, path="temp", save_as="temp")
    job = q.enqueue(run_scraper, temp_file, job_timeout=16600)

    return {"message": "Scraping has been started", "job_id": job.id}


@app.get("/progress/{job_id}")
def progress(job_id):
    job = Job.fetch(job_id, connection=r)
    if job.is_finished:
        csv_path = os.path.abspath(clean_file)
        return FileResponse(path=csv_path, media_type="text/csv", filename=clean_file)
    return {"message": "Scraper is running."}


def _save_file_to_disk(uploaded_file, path=".", save_as="default"):
    extension = os.path.splitext(uploaded_file.filename)[-1]
    temp_file = os.path.join(path, save_as + extension)
    with open(temp_file, "wb") as buffer:
        shutil.copyfileobj(uploaded_file.file, buffer)
    return temp_file

I am new to integrating redis queue with scraping so any guidance regarding solving/handle the timeout error would be much appreciated.

1

There are 1 best solutions below

0
On

It turns out python-rq does not support asyncio directly so I have to wrap the asyncio func into sync func to get it up and running.

import httpx
import asyncio
from bs4 import BeautifulSoup
from decouple import config
from urllib.parse import urlencode
from urllib.parse import urlparse
import os
import logging

from utils import csv_reader, csv_writer

SCRAPERAPI_KEY = config("API_KEY")
NUM_RETRIES = 3

logging.basicConfig(filename="scraper.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


base_url = "https://www.google.com/search?"

headers = {
    "authority": "www.google.com",
    "sec-ch-dpr": "1",
    "sec-ch-viewport-width": "1366",
    "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="98", "Yandex";v="22"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-full-version": '"22.3.3.886"',
    "sec-ch-ua-arch": '"x86"',
    "sec-ch-ua-platform": '"Linux"',
    "sec-ch-ua-platform-version": '"5.4.0"',
    "sec-ch-ua-model": '""',
    "sec-ch-ua-bitness": '"64"',
    "sec-ch-ua-full-version-list": '" Not A;Brand";v="99.0.0.0", "Chromium";v="98.0.4758.886", "Yandex";v="22.3.3.886"',
    "upgrade-insecure-requests": "1",
    "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.141 Safari/537.36",
    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
    "sec-fetch-site": "same-origin",
    "sec-fetch-mode": "navigate",
    "sec-fetch-user": "?1",
    "sec-fetch-dest": "document",
    "referer": "https://www.google.com/",
    "accept-language": "en,ru;q=0.9",
}

pagination_params = {
    "q": "lift installation",
    "source": "lnms",
    "tbm": "shop",
    "ei": "aW-HYv74MrCOseMP_8OumAY",
    "start": "",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

initial_params = {
    "q": "",
    "source": "lnms",
    "tbm": "shop",
    "sa": "X",
    "ved": "0ahUKEwikobSm8e33AhUjUGwGHQVfDr84PBDy0wMIpw0",
    "biw": "480",
    "bih": "665",
    "dpr": "1",
}

results = []


def get_scraperapi_url(url):
    payload = {
        "api_key": SCRAPERAPI_KEY,
        "url": url,
        "country_code": "au",
        "keep_headers": "true",
    }
    proxy_url = "http://api.scraperapi.com/?" + urlencode(payload)
    return proxy_url


async def log_request(request):
    logger.debug(f"Request: {request.method} {request.url}")


async def log_response(response):
    request = response.request
    logger.debug(f"Response: {request.method} {request.url} - Status: {response.status_code}")


async def fetch_pages(keyword, page_no):
    initial_params["q"] = keyword
    if not page_no:
        params = initial_params
        url = base_url + urlencode(params)
    else:
        params = pagination_params

        params["start"] = str(page_no * 10)

        params["q"] = keyword
        url = base_url + urlencode(params)

    async with httpx.AsyncClient(
        event_hooks={"request": [log_request], "response": [log_response]}
    ) as client:
        response = await client.get(
            get_scraperapi_url(url), headers=headers, timeout=None
        )

        return response


async def parse_page(html):

    ad_urls = []
    content = BeautifulSoup(html, "lxml")

    for ad in content.find_all("a", {"class": "sh-np__click-target"}):
        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    "https://www.google.com" + ad["href"], headers=headers, timeout=None
                )
                url = str(r.url)
                ad_urls.append(urlparse(url).netloc)
                logger.debug(urlparse(url).netloc)
        except:
            pass

    for idx in range(len(ad_urls)):

        results.append({"Ad_Url": ad_urls[idx]})

    return results


async def run_scraper(file_path):
    tasks = []
    kw = await csv_reader(file_path)
    for k in kw:
        for page in range(0, 4):
            for _ in range(NUM_RETRIES):
                try:
                    response = await fetch_pages(k, page)
                    if response.status_code in [200, 404]:
                        break
                except httpx.ConnectError:
                    response = ""
            if response.status_code == 200:
                tasks.append(asyncio.create_task(parse_page(response.content)))

    ad_data = await asyncio.gather(*tasks)

    logger.info('Done!')
    await csv_writer(ad_data[0])
    logger.info('csv created.. Please refresh the page to download the csv.')
    

    return ad_data[0]

def get_google_ad_urls(file_path):
    asyncio.run(run_scraper(file_path))

Source