I want to pass a function as a parameter to a celery task. I found two similar questions in stackoverflow (1 and 2). I tried the solutions mentioned in the answers, and this is what I'm currently doing:
Inside caller module:
import marshal
def function_to_be_passed:
# ...
serialized_func_code = marshal.dumps(function_to_be_passed.__code__)
celery_task.delay(serialized_func_code)
And inside the celery task, I'm deserializing and calling the function:
import marshal, types
from celery.task import task
@task()
def celery_task(serialized_func_code):
code = marshal.loads(serialized_func_code)
func = types.FunctionType(code, globals(), "some_func_name")
# calling the deserialized function
func()
However, upon calling celery_task.delay(serialized_func_code)
, I get an error:
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
Sharing stack trace below:
File “...caller.py",
celery_task.delay(
File "/python3.8/site-packages/celery/app/task.py", line 426, in delay
return self.apply_async(args, kwargs)
File "/python3.8/site-packages/celery/app/task.py", line 555, in apply_async
content_type, content_encoding, data = serialization.dumps(
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/python3.8/site-packages/kombu/serialization.py", line 54, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/python3.8/site-packages/vine/five.py", line 194, in reraise
raise value.with_traceback(tb)
File "/python3.8/site-packages/kombu/serialization.py", line 50, in _reraise_errors
yield
File "/python3.8/site-packages/kombu/serialization.py", line 221, in dumps
payload = encoder(data)
File "/python3.8/site-packages/kombu/utils/json.py", line 69, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "/python3.8/site-packages/simplejson/__init__.py", line 398, in dumps
return cls(
File "/python3.8/site-packages/simplejson/encoder.py", line 296, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/python3.8/site-packages/simplejson/encoder.py", line 378, in iterencode
return _iterencode(o, 0)
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0xe3 in position 0: invalid continuation byte
Similar error occurs when I use dumps
and loads
provided by pickle.
kombu.exceptions.EncodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
I'm using Django version 2.2.17 and Python version 3.8.5.
A good answer can be found here. In summary, you can change the task serializer to pickle instead of json as mentioned here. Be careful to not pickle anything from external resources, as it can be dangerous.