I hope this message finds you well. I’m currently working on Temporal Implementation for managing workflows execution. I’ve encountered a challenge and would greatly appreciate your insights and expertise to help me overcome it.
Issue/Question:
We are running the worker as a POD (replica 1 - we don’t want to run more than 1) in cluster. We want to execute multiple workflow at the same time and get the output without waiting for the current workflow to complete it execution (we want parallel execution in same instance of worker)
Steps I’ve Taken:
I have configured the max concurrency of workflow and task in worker side configuration
Code or Configuration:
def slp(sec):
sleep(sec)
return f"slept {sec} sec"
@activity.defn(name="Sleeping 1")
async def sleeping1():
response = slp(20)
return response
@activity.defn(name="sleeping 2")
async def sleeping2():
response = slp(120)
return response
@workflow.defn
class sleepingWF:
@workflow.run
async def run(self, body):
s1 = await workflow.execute_activity(
sleeping1))
s2 = await workflow.execute_activity(
sleeping2))
async def main():
TEMPORAL_ENDPOINT = "localhost:7233"
TASK_QUEUE = "mymac"
client = await Client.connect(
target_host=TEMPORAL_ENDPOINT
)
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[sleepingWF],
activities=[sleeping1, sleeping2],
activity_executor=activity_executor,
max_concurrent_workflow_tasks=100,
max_concurrent_activities=100,
max_concurrent_workflow_task_polls=10,
max_concurrent_activity_task_polls=10,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
I truly appreciate your time and assistance. If you have any suggestions, insights, or possible solutions, please feel free to share them. Your expertise is invaluable, and I look forward to learning from the community.
Expected Outcome:
workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:00:00 end_time -> 00:02:20
Total time take for two execution should be 140 sec
Environment: python 11
Additional Information:
Current total execution time taken:
workflow_execution_1 : start_time -> 00:00:00 end_time -> 00:02:20
workflow_execution_2 : start_time -> 00:02:20 end_time -> 00:04:40
Total time take for two execution takes 280 sec now
Note: I'm still not yet sure what is happening and why, but in the meantime, let me expose a possible explanation and solutions.
TLDR
The call to
sleep()at line 2 seems to block concurrent execution other threads.Try one of these solutions:
sleep()byasyncio.sleep()(and then, you can remove theThreadPoolExecutoraltogether); orProcessPoolExecutorinstead of aThreadPoolExecutor.Details
By default, the Temporal's Python SDK executes both workflow tasks and activity tasks in a single thread, based on
asyncio. As stated in Python SDK's documentation:It is often possible, and preferable, to design your activities to not block the thread. For example, if your activities need to perform SQL queries, HTTP requests, download from or upload to S3, then you can use asyncio compatible libraries. In the case of the
sleep()at line 2, you can useasyncio.sleep()instead.If you really need to execute blocking code from your activities, for example if your activities run long CPU intensive logic, then you may instead configure your worker to execute activities through a
ThreadPoolExecutoror aProcessPoolExecutor.Yet, you apparently did the former, and it didn't work. Why? I suppose that's due to the fact that the
sleepfunction isn't releasing Python's Global Interpreter Lock (GIL), though I'm not sure why.If you don't already know what is Python's GIL, I strongly suggest you get a look to answers on that other question before you continue reading my answer.
The key thing to note here is that Python's GIL is per-process, not per-thread as one would generally expect. That considerably limit the level of concurrency you may actually expect from a
ThreadPoolExecutor. Or put another way,ThreadPoolExecutorwill only help if your code is calling a native code that releases the GIL, doing I/O operations, or calling some other functions provided by Python that similarly release the GIL. Otherwise, other threads won't get the chance to continue executing.I suppose that by
sleep(), you are really talking abouttime.sleep(), right?time.sleep()is expected to release the GIL, though it looks like it isn't in your case. Could it be thatsleep(), at line 2 in your code, is actually referring to a different function?How to diagnose this kind of problem?
In your question, you gave these timing:
I doubt these numbers are correct, as these would mean your second workflow execution don't get started until the first one complete, but in comments, you clearly indicated that you are starting both workflows at the same time.
I'd rather expect your timing to be roughly:
Examining the Workflow History is really the best way to understand what's happening. Here's an example of what you should see in the Temporal UI assuming that there is no blocking-sleep issue. Both workflow executions should look the same:
The Workflow's start time and end time are the ones shown in the green boxes. You may also see the start and end time of your sleep activities (the blue boxes). It's also possible you may see some non-negligible time being spent in workflow task processing, or even time out errors, which would point at other issues.
If my previous suggestions haven't been enough to resolve your issue, please share a picture like that one of both workflow histories, or even better, share the full history in JSON format (press the Download button). Make sure to wait for your workflows to have completed before you capture history. Given the workflow code you shared, there should be at least 17 events in the history when the workflow completes, and the very last event in both history should be a
WorkflowExecutionCompleted.There's a lot to be said about Temporal's workflow history, more than I can write here. If you haven't yet done so, I suggest you take Temporal's 101 and 102 courses; both are available in Python. Among other things, these courses explain how to read the workflow history to debug various type of issues.