首頁>技術>

任務背景

emmmm,最近沉迷學習無法自拔,不過就在昨天收到了一個新需求,因為目前程式的某些地方計算所需的時間太太太太久了,導致讓使用者等待返回結果的時間太久,嚴重影響了使用者體驗急需搞定這件事情,不出意外這個任務又又又落到了我的身上。

使用者使用邏輯與後端處理邏輯圖

從圖中可以發現第二步會消耗大量的時間,並且在經過計算後僅僅會給使用者返回一個id,使用者需要攜帶此id再次發起請求獲取結果,所以根據邏輯圖可以確定後端處理檔案這件事可以非同步進行,沒必要叫客戶等待,那如何做到這個功能呢?就要用到萬能的Celery了[celery的詳細使用者可以直接查詢官網]

簡單說一下Celery到底是一個什麼東西

Celery是一個簡單,靈活且可靠的分散式系統,是一個任務佇列,可以處理大量訊息,著重於實時處理,同時還支援任務排程,最主要的是Celery是開源免費且好用的~

專案背景及環境是一個提供檔案解析的專案用的是Python語言進行編寫的 版本為3.6.10框架使用的是Flask框架 版本為0.11.1需要新安裝的模組有
redispip install redis==2.10.6 # 高版本redis會引發一個報錯Celerypip install celery==3.1.24
可能遇到的報錯
AttributeError: 'str' object has no attribute 'items'降低redis版本 pip install redis==2.10.6
前置條件

已經安裝redis

示例程式結構及功能

功能是非阻塞發郵件

專案目錄結構|-flask_email_celery| |-app| | |-utils| | | |-__init__.py # 1 號 __init__ 檔案| | | |-send_email.py| | |-views| | | |-__init__.py # 2 號__init__ 檔案| | | |-email_send_cle.py| | |-__init__.py # 3 號__init__ 檔案| | |-worker.py| |-run.py| |-settings.py

目錄結構截圖

原始碼彙總

settings.py

# -*- encoding: utf-8 -*-"""@version : 3.6@File : settings.py@Time : 2020/7/30 15:26@Author : xxx@Software: PyCharm@desc: 配置檔案"""# celery 配置CELERY_BROKER_URL = 'redis://localhost:6379/0'CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'CELERY_TASK_SERIALIZER = 'json'# flask 配置SECRET_KEY = "xxxxxxxxxxx"

run.py

# -*- encoding: utf-8 -*-"""@version : 3.6@File : run.py@Time : 2020/7/30 15:25@Author : xxx@Software: PyCharm@desc: """from app import create_appapp = create_app()if __name__ == "__main__": print(app.url_map) app.run()

app/worker.py

# -*- encoding: utf-8 -*-"""@version : 3.6@File : worker.py@Time : 2020/7/30 10:33@Author : xxxx@Software: PyCharm@desc: 非同步主要控制"""from app import celeryfrom app.utils.send_email import [email protected]()def send_email(to, header, content): email = EmailSend() email.send_text_email(to, header, content)

app/__init__.py

# -*- encoding: utf-8 -*-"""@version : 3.6@File : __init__.py.py@Time : 2020/7/30 15:21@Author : xxx@Software: PyCharm@desc: """from flask import Flaskfrom celery import Celeryimport settingscelery = Celery(__name__, broker=settings.CELERY_BROKER_URL)def create_app(): app = Flask(__name__) app.config.from_object(settings) celery.conf.update(app.config) from .views.email_send_cle import email_sends app.register_blueprint(email_sends) return app

app/utils/send_email.py

"""@version: 3.6@author: xxx@file: send_email.py@time: 2019/10/10 10:57@desc: 統一發郵件"""import smtplibfrom email.mime.text import MIMETextimport loggingclass EmailSend(object): def __init__(self): self.logging = logging.getLogger('Waring') self.email_host = 'smtp.xxx.com' # host self.email_port = 'xxx' # 埠號 self.email_pass = 'xxxx' # smtp密碼 self.from_addr = "[email protected]" # 用來發送郵件的郵箱 def send_text_email(self, to_addrs, subject, content): self.logging.warning('send_text_email is willed Discard') self.logging.error('send_text_email is None') message_text = MIMEText(content, 'plain', 'utf8') message_text['From'] = self.from_addr message_text['To'] = to_addrs message_text['Subject'] = subject try: # 在建立客戶端物件的同時,連線到郵箱伺服器。 client = smtplib.SMTP_SSL(host=self.email_host, port=self.email_port) login_result = client.login(self.from_addr, self.email_pass) print(login_result) # (235, b'Authentication successful') if login_result and login_result[0] == 235: print('Successful login') client.sendmail(self.from_addr, to_addrs, message_text.as_string()) print('Successful mail delivery') else: print('Mail sending exception:', login_result[0], login_result[1]) except Exception as e: self.logging.error('Connecting Mailbox Server Exception:{}'.format(e)) def send_image_email(self): pass def send_word_email(self): pass def send_video_email(self): pass

app/views/email_send_cle.py

# -*- encoding: utf-8 -*-"""@version : 3.6@File : email_send_cle.py@Time : 2020/7/30 15:34@Author : xxx@Software: PyCharm@desc: """from flask import Blueprint, request, jsonifyfrom app.worker import send_emailemail_sends = Blueprint('email_sends', __name__)@email_sends.route('/testemail', methods=['POST'])def reset_password(): email = request.form['email'] content = request.form['content'] header = request.form['header'] send_email.delay(email, header, content) return jsonify(code=200, message=u"非同步郵件傳送成功")
啟動及執行方法

1、首先將虛擬環境到安裝有celery與flask的python環境下

source activate 虛擬環境名

2、啟動redis

redis-server.exe

redis-server啟動樣子

3、命令列模式下啟動flask專案

python run.py

4、啟動celery

celery worker -A app.worker:celery --loglevel=info

5、發起一個請求使用postman

結語

這就是一個比較簡單的應用celery的專案,期間遇到過很多的坑,包括迴圈引用,找不到模組等等,簡直欲仙欲死,經過一天多的時間搞定了這個需求,希望對諸位有所幫助,如果有什麼問題或者好的辦法歡迎交流。

#python##軟體開發##科技#

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 關於sdk測試,這些你都知道嗎?