October 19, 2024

Run Celery Worker

from celery import Celery

if __name__ == ‘__main__’:
app = Celery(
‘my_worker’,
broker=’amqp://localhost:3333′,
backend=’redis://localhost:6379′
)
queue_name = ‘mytask’

@app.task(
bind=True, ignore_result=False, queue=queue_name, name=’add_numbers’
)
def add(self, x, y):
print(f”Executing task id: {self.request.id}”)
return x + y

argv = [
‘worker’,
‘–loglevel’,
‘DEBUG’,
‘solo’,
‘-concurrency’,
‘1’,
‘-Q’,
queue_name
]
app.worker_main(argv)

Call function from Publisher thru broker

from celery import Celery
from celery.result import AsyncResult

if __name__ == ‘__main__’:
app = Celery(
‘my_publisher’,
broker=’amqp://localhost:3333′,
backend=’redis://localhost:6379′
)
tasks = celery_broker.control.inspect()
print(tasks.active())
queue_name = ‘mytask’
task: celery.result.AsyncResult = celery_broker.send_task(
“add_numbers”,
params,
queue=queue_name
)

while True:
print(task.task_id)
result = AsyncResult(task.task _id)
print(result.status)
time.sleep(1)
if result.status in [“STARTED”, “PENDING”, “PROGRESS”]:
continue
elif result.status == “SUCCESS” :
print(
result.result
)
break
else:
break