I have a Python method that executes concurrently. I use database FOR UPDATE locks to control the concurrency. The expected behavior is that when one thread reads the data and locks the record to perform a transactional update, the second thread waits. After the first thread commits the transaction and releases the lock, the second thread continues and gets the updated value from the first thread.
However, this is currently not working as expected - the second thread is unable to retrieve the updated value from the first thread. Can you help me debug what's going on? I'm providing the code and execution logs below.
async def to_run():
threadId = random.randint(0, 100)
task_type = TaskType.OTHER
async with async_db_session() as db:
async with db.begin() as trans:
result = await db.execute(text("SELECT @@session.transaction_isolation"))
isolation_level = result.scalar()
log.info(f"Current isolation level: {isolation_level}")
result = await db.execute(text("SELECT @@autocommit"))
current_autocommit_mode = result.scalar()
log.info(f"Current auto-commit mode: {current_autocommit_mode}")
task = await TaskDao.get_first_task(db, task_type)
log.info(f"get task from db,task type={task_type}, task={task}")
if task is None:
await trans.commit()
await asyncio.sleep(2)
return
log.info(f"{task.task_id} : {threadId} : start get task lock")
task = await TaskDao.lock_by_id(db, pk=task.id)
log.info(f"{task.task_id} : {threadId} : finish get task lock, task_status={task.task_status}")
await asyncio.sleep(1)
task = await TaskDao.get(db, pk=task.id)
log.info(f"{task.task_id} : {threadId} : finish get task again, task_status={task.task_status}")
if task.task_status != TaskStatus.INIT.value and task.task_status != TaskStatus.INTERRUPT.value:
log.info(f"{task.task_id} : {threadId} : task is processed by other instance={task.instance_id}")
await asyncio.sleep(2)
await trans.commit()
return
log.info(f"{task.task_id} : {threadId} : start update task status to running")
await TaskService.start_running_task(db, task)
log.info(f"{task.task_id} : {threadId} : finish update task status to running")
log.info(f"{task.task_id} : {threadId} : start commit transaction")
await trans.commit()
db.flush()
log.info(f"{task.task_id} : {threadId} : finish commit transaction")
async with async_db_session() as db:
task = await TaskDao.get(db, pk=task.id)
log.info(f"{task.task_id} : {threadId} : finish get task again and again, task_status={task.task_status}")
log.info(f"{task.task_id} : {threadId} : start task, update db finish")
test:to_run:58 - Current isolation level: READ-COMMITTED
test:to_run:58 - Current isolation level: READ-COMMITTED
test:to_run:62 - Current auto-commit mode: 0
test:to_run:62 - Current auto-commit mode: 0
test:to_run:65 - get task from db,task type=TaskType.OTHER, task=Task(id=4694, task_id=6084002...)
test:to_run:71 - 6084002 : 4 : start get task lock
test:to_run:65 - get task from db,task type=TaskType.OTHER, task=TaskTask(id=4694, task_id=6084002...)
test:to_run:71 - 6084002 : 96 : start get task lock
test:to_run:73 - 6084002 : 4 : finish get task lock, task_status=1
test:to_run:78 - 6084002 : 4 : finish get task again, task_status=1
test:to_run:86 - 6084002 : 4 : start update task status to running
test:to_run:88 - 6084002 : 4 : finish update task status to running
test:to_run:89 - 6084002 : 4 : start commit transaction
test:to_run:73 - 6084002 : 96 : finish get task lock, task_status=1
test:to_run:91 - 6084002 : 4 : finish commit transaction
test:to_run:95 - 6084002 : 4 : finish get task again and again, task_status=10
test:to_run:97 - 6084002 : 4 : start task, update db finish
test:to_run:78 - 6084002 : 96 : finish get task again, task_status=1
test:to_run:86 - 6084002 : 96 : start update task status to running
test:to_run:88 - 6084002 : 96 : finish update task status to running
test:to_run:89 - 6084002 : 96 : start commit transaction
test:to_run:91 - 6084002 : 96 : finish commit transaction
test:to_run:95 - 6084002 : 96 : finish get task again and again, task_status=10
test:to_run:97 - 6084002 : 96 : start task, update db finish
from the logs, thread 4 have get row lock first and updated the value, then thread 96 get the row lock. but thread 96 can't get updated task status in the transcation. after the transaction commited, i start a new session and it get the updated value.
in my opnions, RC(read commited) means in one transaction, it will always get final version of data which commited by other transaction. i don't know why it not works.
i have changed the mysql instance version and it not work.