How to pass function as an argument to a celery task?

2.3k Views Asked by At

I want to pass a function as a parameter to a celery task. I found two similar questions in stackoverflow (1 and 2). I tried the solutions mentioned in the answers, and this is what I'm currently doing:

Inside caller module:

import marshal

def function_to_be_passed:
    # ...

serialized_func_code = marshal.dumps(function_to_be_passed.__code__)
celery_task.delay(serialized_func_code)

And inside the celery task, I'm deserializing and calling the function:

import marshal, types
from celery.task import task

@task()
def celery_task(serialized_func_code):
    code = marshal.loads(serialized_func_code)
    func = types.FunctionType(code, globals(), "some_func_name")

    # calling the deserialized function
    func()

However, upon calling celery_task.delay(serialized_func_code), I get an error:

kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte

Sharing stack trace below:

File “...caller.py",
 celery_task.delay(
File "/python3.8/site-packages/celery/app/task.py", line 426, in delay
  return self.apply_async(args, kwargs)
File "/python3.8/site-packages/celery/app/task.py", line 555, in apply_async
  content_type, content_encoding, data = serialization.dumps(
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
  payload = encoder(data)
File "/python3.8/contextlib.py", line 131, in __exit__
  self.gen.throw(type, value, traceback)
File "/python3.8/site-packages/kombu/serialization.py", line 54, in _reraise_errors
  reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/python3.8/site-packages/vine/five.py", line 194, in reraise
  raise value.with_traceback(tb)
File "/python3.8/site-packages/kombu/serialization.py", line 50, in _reraise_errors
  yield
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
  payload = encoder(data)
File "/python3.8/site-packages/kombu/utils/json.py", line 69, in dumps
  return _dumps(s, cls=cls or _default_encoder,
File "/python3.8/site-packages/simplejson/__init__.py", line 398, in dumps
  return cls(
File "/python3.8/site-packages/simplejson/encoder.py", line 296, in encode
  chunks = self.iterencode(o, _one_shot=True)
File "/python3.8/site-packages/simplejson/encoder.py", line 378, in iterencode
    return _iterencode(o, 0)
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte

Similar error occurs when I use dumps and loads provided by pickle.

kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte

I'm using Django version 2.2.17 and Python version 3.8.5.

1

There are 1 best solutions below

0
On

A good answer can be found here. In summary, you can change the task serializer to pickle instead of json as mentioned here. Be careful to not pickle anything from external resources, as it can be dangerous.