FastAPI生产环境部署:高并发API服务实践
FastAPI生产环境部署:高并发API服务实践
FastAPI是一个现代化的Python Web框架,本文分享在生产环境中部署FastAPI服务的实践经验。
FastAPI核心特性
异步支持
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/async-endpoint")
async def async_handler():
# 异步I/O操作
result = await some_async_operation()
return result
自动API文档
FastAPI自动生成OpenAPI文档:
- Swagger UI:
/docs - ReDoc:
/redoc - OpenAPI JSON:
/openapi.json
高并发架构设计
异步处理架构
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import asyncio
app = FastAPI()
# 线程池用于CPU密集型任务
executor = ThreadPoolExecutor(max_workers=10)
@app.post("/process")
async def process_data(data: dict, background_tasks: BackgroundTasks):
# 异步处理
result = await async_process(data)
# 后台任务
background_tasks.add_task(cleanup_task, data)
return result
async def async_process(data):
# I/O密集型操作使用async/await
async with aiohttp.ClientSession() as session:
response = await session.post(url, json=data)
return await response.json()
def cpu_intensive_task(data):
# CPU密集型任务使用线程池
return heavy_computation(data)
连接池优化
import asyncpg
from contextlib import asynccontextmanager
# 全局连接池
pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时创建连接池
global pool
pool = await asyncpg.create_pool(
host="localhost",
port=5432,
database="mydb",
user="user",
password="password",
min_size=10, # 最小连接数
max_size=50, # 最大连接数
max_queries=50000, # 每个连接最大查询数
max_inactive_connection_lifetime=300 # 空闲连接超时
)
yield
# 关闭时清理连接池
await pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/query")
async def query_database():
async with pool.acquire() as connection:
result = await connection.fetch("SELECT * FROM table")
return result
性能优化技巧
1. 响应压缩
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
2. 请求限流
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/api")
@limiter.limit("10/minute")
async def limited_endpoint(request: Request):
return {"message": "OK"}
3. 缓存策略
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis
@app.on_event("startup")
async def startup():
redis_client = redis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache")
@app.get("/cached")
@cache(expire=60) # 缓存60秒
async def cached_endpoint():
# 耗时操作
result = expensive_operation()
return result
4. 数据库查询优化
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
# 使用selectinload避免N+1查询
async def get_users_with_posts(db: AsyncSession):
result = await db.execute(
select(User)
.options(selectinload(User.posts))
)
return result.scalars().all()
部署配置
Docker部署
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 使用Gunicorn + Uvicorn workers
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
Gunicorn配置
# gunicorn_config.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 10000
max_requests_jitter = 1000
timeout = 30
keepalive = 5
Nginx反向代理
upstream fastapi_backend {
least_conn;
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
}
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://fastapi_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# WebSocket支持
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
监控和日志
结构化日志
import logging
import json
from pythonjsonlogger import jsonlogger
# 配置JSON日志
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.info({
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"process_time": process_time
})
return response
Prometheus指标
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
return Response(content=generate_latest(), media_type="text/plain")
错误处理
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"path": request.url.path
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": "Validation error",
"details": exc.errors()
}
)
性能测试结果
在4核8GB服务器上的压测结果:
- QPS: 5000+
- P50延迟: 15ms
- P99延迟: 80ms
- 并发连接: 1000+
最佳实践
- 使用异步I/O:充分利用FastAPI的异步特性
- 连接池管理:合理配置数据库连接池
- 监控告警:建立完善的监控体系
- 错误处理:统一的错误处理机制
- 文档完善:利用自动生成的API文档
总结
FastAPI是一个优秀的现代Web框架,通过合理的架构设计和优化配置,可以构建高性能的生产级API服务。关键是要充分利用异步特性,优化I/O操作,并建立完善的监控体系。
希望这些经验对正在部署FastAPI服务的开发者有所帮助!