python multi threading session with msqyl pooling

258 Views Asked by At
import requests, pymysql, pymysqlpool
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from tenacity import retry, TryAgain, stop_after_attempt

pool = PooledDB(creator = pymysql,
                maxconnections = 0,
                autocommit = True,
                host = 'localhost',
                user = 'someuser',
                passwd = 'somepwd',
                database = 'database')

db = pool.connection()
cur = db.cursor()
dict = {key1: url1,
key2: url2}

@retry(stop=stop_after_attempt(10))
def get_id_list_from_a_website(session, dict_item):
    key, url = dict_item
    sess =  requests.session()

    id_list_return = []
    ...
    scrape website to get id list here
    ...
    if not id_list_return:
        raise TryAgain()
    return id_list_return

def insert_sql(id_after_check, list_of_variable_after_check)
    id = id_after_check
    list_of_variable = list_of_variable_after_check
    if id == 'exist':
        print('key_exist')
    else:
        try:
            sql1 = insert list_of_variable to id
            cur.execute(sql1)
            cur.close()
            db.commit()
            print('successfully insert')
        except:
            print('except')

def get_some_data(session, executor):
    dict_item = dict.items()
    id_list_return = list(executor.map(partial(get_list, session), (dict_item)))
    raw_id_list_return = []

    for i in range(0, len(dict.keys())):
    raw_id_list_return = raw_id_list_return + id_list_return[i]

    futures = {executor.submit(partial(check_stuff, session), stuff_id): stuff_id for stuff_id in raw_list_return}
    for future in as_completed(futures):
        futures[future]
        stuff_id_data = future.result()
        stuff_id = stuff_data_from_check[0]
        stuff_variables = stuff_data_from_check[1:]
        if stuff_variables == 'id_existed'
            print('stuff_id_exist')
        else:
            print('go_insert')
            go_insert = executor.submit(partial(insert_sql, stuff_id), stuff_variables)
            db.close()

@retry(stop=stop_after_attempt(10))
def check_stuff(session, stuff_id):
    stuff_id = stuff_id_from_raw_list
    sql2 = """SELECT * FROM `tbl1` WHERE stuff_id = '"""+str(stuff_id)+"""';"""
    cur.execute(sql2)
    row_count = cur.fetchall()

    
    if row_count == int(0):
       
        sql3= """UPDATE `tbl1` SET latest_search_date='%s' WHERE stuff_id='%s';""" % (search_date, stuff_id)
        cur.execute(sql3)
        cur.close()
        stuff_id_and_variables = [stuff_id, 'stuff_id existed']

    else:
        scape data here and return 'stuff_id_and_variable' 
        if not stuff_id_and_variable:
            raise TryAgain()

    return stuff_id_and_variables

N_THREADS=50
with requests.Session() as session:
    with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
        get_some_data(session, executor)

The problem is: when I check the id in database (execute sql2), few sql2 can be run properly and get the 'stuff_id existed'. However, after some queries, sql2 cannot be run properly even the stuff_id in database already existed and then 'check_stuff()' scrape the data again. So, you can imagine the function 'insert_sql()' will be raised and cause exception...although I can change 'Insert into' to 'Replace into' to finish the data inserting process but I want to get the code without exception and spend less time on duplicate scraping process. Thanks.

0

There are 0 best solutions below