在 Flask 中使用 concurrent.futures 模块来实现异步任务是一种有效的方法,可以让你在后台执行耗时的操作,而不会阻塞主线程。concurrent.futures 提供了 ThreadPoolExecutorProcessPoolExecutor 两种执行器,对于 Flask 这样的 IO 密集型应用,通常使用 ThreadPoolExecutor 就足够了。

以下是一个示例,展示了如何在 Flask 应用中使用 concurrent.futures.ThreadPoolExecutor 来运行异步任务:

from flask import Flask, jsonify
import concurrent.futures
import time

app = Flask(__name__)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

def background_task(task_id):
    """模拟一个耗时的任务"""
    time.sleep(5)  # 模拟耗时操作
    return f"Task {task_id} completed"

@app.route('/start-task/<int:task_id>')
def start_task(task_id):
    """启动一个后台任务并立即返回响应"""
    executor.submit(background_task, task_id)
    return jsonify({"message": f"Task {task_id} started"}), 202

@app.route('/tasks')
def get_tasks():
    """获取所有任务的状态(示例)"""
    # 这里应该有一个数据库或某种存储机制来跟踪任务状态
    # 以下返回的是一个静态的示例列表
    return jsonify({"tasks": ["Task 1 completed", "Task 2 completed", "Task 3 completed"]}), 200

if __name__ == '__main__':
    app.run(debug=True)

在这个例子中,我们定义了一个 background_task 函数,它模拟了一个耗时的任务。在 start_task 视图函数中,我们使用 executor.submit 方法来提交任务到线程池中执行。submit 方法会立即返回一个 Future 对象,但我们不关心这个对象,因为我们不等待任务完成就返回响应。

max_workers 参数定义了线程池中的线程数量,你可以根据你的服务器性能和任务特性来调整这个值。

我们还定义了一个 get_tasks 视图函数,用于获取所有任务的状态。在实际应用中,你应该有一个数据库或其他存储机制来跟踪任务的状态。

请注意,虽然这种方法可以让你在 Flask 应用中运行异步任务,但这些任务仍然是在后台同步执行的。这意味着如果你的 Flask 应用是部署在单个进程中,那么它仍然不能同时处理多个请求。为了充分利用多核 CPU 和处理更多的并发请求,你可能需要使用像 Gunicorn 这样的 WSGI HTTP 服务器,并配合 geventeventlet 这样的异步库来运行 Flask 应用。

此外,如果你的异步任务是 CPU 密集型的,那么使用 ProcessPoolExecutor 可能会更合适,因为它可以利用多核 CPU 并行执行任务。但是,请注意,进程间的通信成本比线程间的通信成本要高,所以 ProcessPoolExecutor 通常用于执行真正 CPU 密集型的任务。