在 Flask 中使用 concurrent.futures
模块来实现异步任务是一种有效的方法,可以让你在后台执行耗时的操作,而不会阻塞主线程。concurrent.futures
提供了 ThreadPoolExecutor
和 ProcessPoolExecutor
两种执行器,对于 Flask 这样的 IO 密集型应用,通常使用 ThreadPoolExecutor
就足够了。
以下是一个示例,展示了如何在 Flask 应用中使用 concurrent.futures.ThreadPoolExecutor
来运行异步任务:
from flask import Flask, jsonify
import concurrent.futures
import time
app = Flask(__name__)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def background_task(task_id):
"""模拟一个耗时的任务"""
time.sleep(5) # 模拟耗时操作
return f"Task {task_id} completed"
@app.route('/start-task/<int:task_id>')
def start_task(task_id):
"""启动一个后台任务并立即返回响应"""
executor.submit(background_task, task_id)
return jsonify({"message": f"Task {task_id} started"}), 202
@app.route('/tasks')
def get_tasks():
"""获取所有任务的状态(示例)"""
# 这里应该有一个数据库或某种存储机制来跟踪任务状态
# 以下返回的是一个静态的示例列表
return jsonify({"tasks": ["Task 1 completed", "Task 2 completed", "Task 3 completed"]}), 200
if __name__ == '__main__':
app.run(debug=True)
在这个例子中,我们定义了一个 background_task
函数,它模拟了一个耗时的任务。在 start_task
视图函数中,我们使用 executor.submit
方法来提交任务到线程池中执行。submit
方法会立即返回一个 Future
对象,但我们不关心这个对象,因为我们不等待任务完成就返回响应。
max_workers
参数定义了线程池中的线程数量,你可以根据你的服务器性能和任务特性来调整这个值。
我们还定义了一个 get_tasks
视图函数,用于获取所有任务的状态。在实际应用中,你应该有一个数据库或其他存储机制来跟踪任务的状态。
请注意,虽然这种方法可以让你在 Flask 应用中运行异步任务,但这些任务仍然是在后台同步执行的。这意味着如果你的 Flask 应用是部署在单个进程中,那么它仍然不能同时处理多个请求。为了充分利用多核 CPU 和处理更多的并发请求,你可能需要使用像 Gunicorn 这样的 WSGI HTTP 服务器,并配合 gevent
或 eventlet
这样的异步库来运行 Flask 应用。
此外,如果你的异步任务是 CPU 密集型的,那么使用 ProcessPoolExecutor
可能会更合适,因为它可以利用多核 CPU 并行执行任务。但是,请注意,进程间的通信成本比线程间的通信成本要高,所以 ProcessPoolExecutor
通常用于执行真正 CPU 密集型的任务。