異步任務(wù),是 Web 開發(fā)中經(jīng)常遇到的問題,,比如說用戶提交了一個(gè)請求,,雖然這個(gè)請求對(duì)應(yīng)的任務(wù)非常耗時(shí),但是不能讓用戶等在這里,,通常需要立即返回結(jié)果,,告訴用戶任務(wù)已提交。任務(wù)可以在后續(xù)慢慢完成,,完成后再給用戶發(fā)一個(gè)完成的通知,。
? curl -X POST http://localhost:8080/process {'status':'PENDING','id':'a129c666-7b5b-45f7-ba54-9d7b96a1fe58','error':''}%
5,、查詢?nèi)蝿?wù)狀態(tài):
curl -X POST http://localhost:8080/check_progress/<task_id>
任務(wù)完成后的返回結(jié)果如下:
? curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58 {'status':'SUCEESS','data':'\'hello\''}%
代碼目錄結(jié)構(gòu)如下:
其中 app.py 如下:
from fastapi import FastAPI from celery.result import AsyncResult from tasks import start_processing from loguru import logger from pymongo import MongoClient import uvicorn
# Lets create a connection to our backend where celery stores the results client = MongoClient('mongodb://mongodb:27017')
# Default database and collection names that Celery create db = client['task_results'] coll = db['celery_taskmeta']
app = FastAPI()
@app.post('/process') asyncdefprocess_text_file(): ''' Process endpoint to trigger the start of a process ''' try: result = start_processing.delay() logger.info(f'Started processing the task with id {result.id}') return { 'status': result.state, 'id': result.id, 'error': '' } except Exception as e: logger.info(f'Task Execution failed: {e}') return { 'status': 'FAILURE', 'id': None, 'error': e }
@app.post('/check_progress/{task_id}') asyncdefcheck_async_progress(task_id: str): ''' Endpoint to check the task progress and fetch the results if the task is complete. ''' try: result = AsyncResult(task_id) if result.ready(): data = coll.find({'_id': task_id})[0] return {'status': 'SUCEESS', 'data': data['result']} else: return {'status': result.state, 'error': ''} except Exception as e: data = coll.find({'_id': task_id})[0] if data: return {'status': 'SUCEESS', 'data': data['result']} return {'status': 'Task ID invalid', 'error': e}
if __name__ == '__main__': uvicorn.run('app:app', host='0.0.0.0', port='8080')