February 22, 2025

Run Celery Worker

from celery import Celery

if __name__ == '__main__':
    app = Celery(
        'my_worker',
         broker='amqp://localhost:3333',
         backend='redis://localhost:6379'
    )
    queue_name = 'mytask'

    @app.task(
        bind=True, ignore_result=False, queue=queue_name, name='add_numbers'
    )
    def add(self, x, y):
        print(f"Executing task id: {self.request.id}")
        return x + y

    argv = [
        'worker',
        '--loglevel',
        'DEBUG',
        'solo',
        '-concurrency',
        '1',
        ‘-Q',
        queue_name
    ]
    app.worker_main(argv)

Call function from Publisher thru broker

from celery import Celery
from celery.result import AsyncResult


if __name__ == '__main__':
    app = Celery(
        'my_publisher',
         broker='amqp://localhost:3333',
         backend='redis://localhost:6379'
    )
    tasks = celery_broker.control.inspect()
    print(tasks.active())
    queue_name = 'mytask'
    task: celery.result.AsyncResult = celery_broker.send_task(
        "add_numbers",
        params,
        queue=queue_name
    )

    while True:
        print(task.task_id)
        result = AsyncResult(task.task _id)
        print(result.status)
        time.sleep(1)
        if result.status in ["STARTED", "PENDING", "PROGRESS"]:
            continue
        elif result.status == "SUCCESS" :
            print(
                result.result
            )
            break
        else:
            break

Kill the process from broker

celery.revoke(task_id, terminate=True, signal='SIGKILL')