Prioritize some workflow executions over others

874 Views Asked by At

I've been using the flow framework for amazon swf and I want to be able to run priority workflow executions and normal workflow executions. If there are priority tasks, then activities should pick up the priority tasks ahead of normal priority tasks. What is the best way to accomplish this?

I'm thinking that the following might work but I wonder if there's a better/recommended approach.

  1. I'll define two Activity Workers and two activity lists for the activity. One priority list and one normal list. Each worker will be using the same activity class.
  2. Both workers will be run on the same host (ec2 instance).
  3. On the workflow, I'll define two methods: startNormalWorkflow and startHighWorkflow. In the startHighWorkflow method, I can use ActivitySchedulingOptions to put the task on the high priority list.

Problem with this approach is that there is no guarantee that the high priority task is scheduled before normal tasks.

2

There are 2 best solutions below

1
On BEST ANSWER

It's a good question, it had me scratching my head for a while.

Of course, there is more than one way to skin this cat and there exists a number of valid solutions. I focused here on the simplest possible that I could conceive of, namely, execution of tasks in order of priority within a single workflow.

The scenario goes as follows: I define one activity worker serving two task lists, default_tasks and urgent_tasks, with a trivial logic:

  1. If there are pending tasks on the urgent_tasks list, then pick one from there,
  2. Otherwise, pick a task from default_tasks
  3. Execute any task selected.

The question is how to check if any high priority tasks are pending? CountPendingActivityTasks API comes to the rescue!

I know you use Flow for development. My example is written using boto.swf.layer2 as Python is so much easier for prototyping - but the idea remains the same and can be extended to a more complex scenario with high and low priority workflow executions.

So, to accomplish the above using boto.swf follow these steps:

Export credentials to the environment

$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key 

Get the code snippets

For convenience, you can fork it from github:

$ git clone [email protected]:oozie/stackoverflow.git
$ cd stackoverflow/amazon-swf/priority_tasks/

To bootstrap the domain and the workflow:

# domain_setup.py 
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

Decider implementation:

# decider.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'

class MyWorkflowDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']

            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Complete workflow execution after 5 completed activities.
                closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                if closed_activity_count == 5:
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

Prioritizing worker implementation:

# worker.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

class PrioritizingWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION

    def run(self):

        urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
        if urgent_task_count > 0:
            self.task_list = 'urgent_tasks'
        else:
            self.task_list = 'default_tasks'
        activity_task = self.poll()

        if 'activityId' in activity_task:
            print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
            self.complete()
            return True

Run the workflow from three instances of an interactive Python shell

Run the decider:

$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
... 

Start an execution:

$ python -i decider.py 
>>> swf.WorkflowType(domain='stackoverflow', name='MyWorkflow', version='1.0', task_list='default_tasks').start()

Finally, kick off the worker and watch the tasks as they're getting executed:

$ python -i worker.py 
>>> while PrioritizingWorker().run(): pass
... 
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3
1
On

It turns out that using a separate task list that you have to check first doesn't work well.

There's a couple of problems.

First, the count API doesn't update reliably. So you may get 0 tasks even when there are urgent tasks in the queue.

Second, the call that polls for tasks hangs if there are no tasks available. So when you poll for the non-urgent tasks, that will "stick" for either 2 minutes, or until you have a non-urgent task to do.

So this can cause all kinds of problems in your workflow.

For this to work, SWF would have to implement a polling API that could return the first task from a list of task lists. Then it would be much easier.