October 19, 2024

1. by using bind=True

For using function over Celery, it is required to use a function ‘Decorator’ in Python primitive. If you add bind=True on the Decorator from celery, task_id can be called.

app = celery.Celery(
    __name__,
    broker=cfg.rabbitmq_url,
    backend=cfg.redis_url
)

@app.task(name="func1", bind=True)
def func1(self, *args, **kwargs):
    print(f"task_id: {self.request.id}")

2. by using current_task from celery

from celery import current_task

@app.task(name="func2")
def func2(self, *args, **kwargs):
    print(f"task_id: {current_task.request.id}")