久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

超實(shí)用 Demo:使用 FastAPI,、Celery,、RabbitMQ 和 MongoDB 實(shí)現(xiàn)一個(gè)...

 只怕想不到 2022-10-18 發(fā)布于湖北

異步任務(wù),是 Web 開發(fā)中經(jīng)常遇到的問題,,比如說用戶提交了一個(gè)請求,,雖然這個(gè)請求對(duì)應(yīng)的任務(wù)非常耗時(shí),但是不能讓用戶等在這里,,通常需要立即返回結(jié)果,,告訴用戶任務(wù)已提交。任務(wù)可以在后續(xù)慢慢完成,,完成后再給用戶發(fā)一個(gè)完成的通知,。

今天分享一份代碼,使用 Celery,、RabbitMQ 和 MongoDB 實(shí)現(xiàn)一個(gè)異步任務(wù)工作流,,你可以修改 task.py 來實(shí)現(xiàn)你自己的異步任務(wù)。

架構(gòu)圖如下:

圖片

其中 Celery 來執(zhí)行異步任務(wù),,RabbitMQ 作為消息隊(duì)列,,MongoDB 存儲(chǔ)任務(wù)執(zhí)行結(jié)果,F(xiàn)astAPI 提供 Web 接口。

以上所有模塊均可使用 Docker 一鍵部署,。

下面為 Demo 使用方法:

1,、確保本機(jī)已安裝 Docker、Git

2,、下載源代碼:

git clone https://github.com/aarunjith/async-demo.git

3,、部署并啟動(dòng):

cd async-demo
docker compose up --build

4、啟動(dòng)一個(gè)異步任務(wù):

$ curl -X POST http://localhost:8080/process

任務(wù)會(huì)發(fā)送到消息隊(duì)列,,同時(shí)會(huì)立即返回一個(gè)任務(wù) id:

? curl -X POST http://localhost:8080/process
{'status':'PENDING','id':'a129c666-7b5b-45f7-ba54-9d7b96a1fe58','error':''}%

5,、查詢?nèi)蝿?wù)狀態(tài):

curl -X POST http://localhost:8080/check_progress/<task_id>

任務(wù)完成后的返回結(jié)果如下:

 ? curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{'status':'SUCEESS','data':'\'hello\''}%

代碼目錄結(jié)構(gòu)如下:

圖片

其中 app.py 如下:

from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn

# Lets create a connection to our backend where celery stores the results
client = MongoClient('mongodb://mongodb:27017')

# Default database and collection names that Celery create
db = client['task_results']
coll = db['celery_taskmeta']

app = FastAPI()


@app.post('/process')
async def process_text_file():
    '''
    Process endpoint to trigger the start of a process
    '''

    try:
        result = start_processing.delay()
        logger.info(f'Started processing the task with id {result.id}')
        return {
            'status': result.state,
            'id': result.id,
            'error'''
        }
    except Exception as e:
        logger.info(f'Task Execution failed: {e}')
        return {
            'status''FAILURE',
            'id'None,
            'error': e
        }


@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
    '''
    Endpoint to check the task progress and fetch the results if the task is
    complete.
    '''

    try:
        result = AsyncResult(task_id)
        if result.ready():
            data = coll.find({'_id': task_id})[0]
            return {'status''SUCEESS''data': data['result']}
        else:
            return {'status': result.state, 'error'''}
    except Exception as e:
        data = coll.find({'_id': task_id})[0]
        if data:
            return {'status''SUCEESS''data': data['result']}
        return {'status''Task ID invalid''error': e}

if __name__ == '__main__':
    uvicorn.run('app:app', host='0.0.0.0', port='8080')

如果要實(shí)現(xiàn)自己的任務(wù)隊(duì)列,就修改 task.py 來添加自己的異步任務(wù),,可以整合到自己的項(xiàng)目中,。

最后的話

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,,不代表本站觀點(diǎn),。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請遵守用戶 評(píng)論公約

    類似文章 更多