1. by using bind=True
For using function over Celery, it is required to use a function ‘Decorator’ in Python primitive. If you add bind=True on the Decorator from celery, task_id can be called.
app = celery.Celery(
__name__,
broker=cfg.rabbitmq_url,
backend=cfg.redis_url
)
@app.task(name="func1", bind=True)
def func1(self, *args, **kwargs):
print(f"task_id: {self.request.id}")
2. by using current_task from celery
from celery import current_task
@app.task(name="func2")
def func2(self, *args, **kwargs):
print(f"task_id: {current_task.request.id}")