I'm attempting to change the tasks available on a python-gearman worker during its work cycle. My reason for doing this is to allow me a little bit of control over my worker processes and allowing them to reload from a database. I need every worker to reload at regular intervals, but I don't want to simply kill the processes, and I want the service to be constantly available which means that I have to reload in batches. So I would have 4 workers reloading while another 4 workers are available to process, and then reload the next 4 workers.
Process:
- Start reload process 4 times.
- unregister the
reload
process - reload the dataset
- register a
finishReload
task - return
- unregister the
- Repeat step 1 until there are no workers with the
reload
task registered. - Start
finishReload
(1) task until there are no workers with thefinishReload
task available.
(1) the finishReload task unregisters the finishReload
task and registers the reload
task and then returns.
Now, the problem that I'm running into is that the job fails when I change the tasks that are available to the worker process. There are no error messages or exceptions, just an "ERROR" in the gearmand log. Here's a quick program that replicates the problem.
WORKER
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') # problem line
return str(len(gmJob.data))
worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()
CLIENT
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps
a = client.submit_job('strcount', 'spam and eggs')
...
Please let me know if there are anything things that I can elucidate.
EDIT: I know that someone will ask to see the log I mentioned. I've posted this question to the gearman group on Google as well, and log is available there.
It looks like subclassing the GearmanWorker class and adding a few flags can work around this issue. I need to allow the job to complete before I start issuing new commands from the worker to the server, which seems to interrupt the current job. So if we overwrite the
on_job_complete
function we can check for the enable/disable flag and act on those after we issue thesend_job_complete
command. The new worker program follows:WORKER