A new version (v0.5) of celery-batches is available which adds support for Celery 5.1 and fixes storing of results when using the RPC result backend.
As explored previously, the RPC result backend works by having a results queue per client, unfortunately celery-batches was attempting to store the results in a queue named after the task ID instead of the client ID (Celery internally calls this the “correlation ID”) [1].
This unfortunately requires a change to client code to pass the batched request into the mark_as_done method, using the example from the documnation with the changed line highlighted:
import requests
from urlparse import urlparse
from celery_batches import Batches
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
sig = lambda url: url
responses = wot_api_real(
(sig(*request.args, **request.kwargs) for request in requests)
)
# use mark_as_done to manually return response data
for response, request in zip(responses, requests):
app.backend.mark_as_done(request.id, response, request=request)
def wot_api_real(urls):
domains = [urlparse(url).netloc for url in urls]
response = requests.get(
wot_api_target,
params={'hosts': ('/').join(set(domains)) + '/'}
)
return [response.json[domain] for domain in domains]
[1] | The RPC backend has code which pulls out the correlation ID, but falls back to the task ID if not given. This is called via an override of the store_result method. |