celery-batches 0.5 released!

Published on Monday, May 24, 2021
Tags: celery, celery-batches

A new version (v0.5) of celery-batches is available which adds support for Celery 5.1 and fixes storing of results when using the RPC result backend.

As explored previously, the RPC result backend works by having a results queue per client, unfortunately celery-batches was attempting to store the results in a queue named after the task ID instead of the client ID (Celery internally calls this the “correlation ID”) [1].

This unfortunately requires a change to client code to pass the batched request into the mark_as_done method, using the example from the documnation with the changed line highlighted:

import requests
from urlparse import urlparse

from celery_batches import Batches

wot_api_target = 'https://api.mywot.com/0.4/public_link_json'

@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
    sig = lambda url: url
    responses = wot_api_real(
        (sig(*request.args, **request.kwargs) for request in requests)
    )
    # use mark_as_done to manually return response data
    for response, request in zip(responses, requests):
        app.backend.mark_as_done(request.id, response, request=request)


def wot_api_real(urls):
    domains = [urlparse(url).netloc for url in urls]
    response = requests.get(
        wot_api_target,
        params={'hosts': ('/').join(set(domains)) + '/'}
    )
    return [response.json[domain] for domain in domains]
[1]The RPC backend has code which pulls out the correlation ID, but falls back to the task ID if not given. This is called via an override of the store_result method.