FastAPI生产环境部署:高并发API服务实践

FastAPI生产环境部署:高并发API服务实践

FastAPI是一个现代化的Python Web框架,本文分享在生产环境中部署FastAPI服务的实践经验。

FastAPI核心特性

异步支持

from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/async-endpoint")
async def async_handler():
    # 异步I/O操作
    result = await some_async_operation()
    return result

自动API文档

FastAPI自动生成OpenAPI文档:

  • Swagger UI: /docs
  • ReDoc: /redoc
  • OpenAPI JSON: /openapi.json

高并发架构设计

异步处理架构

from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ThreadPoolExecutor
import asyncio

app = FastAPI()

# 线程池用于CPU密集型任务
executor = ThreadPoolExecutor(max_workers=10)

@app.post("/process")
async def process_data(data: dict, background_tasks: BackgroundTasks):
    # 异步处理
    result = await async_process(data)
    
    # 后台任务
    background_tasks.add_task(cleanup_task, data)
    
    return result

async def async_process(data):
    # I/O密集型操作使用async/await
    async with aiohttp.ClientSession() as session:
        response = await session.post(url, json=data)
        return await response.json()

def cpu_intensive_task(data):
    # CPU密集型任务使用线程池
    return heavy_computation(data)

连接池优化

import asyncpg
from contextlib import asynccontextmanager

# 全局连接池
pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时创建连接池
    global pool
    pool = await asyncpg.create_pool(
        host="localhost",
        port=5432,
        database="mydb",
        user="user",
        password="password",
        min_size=10,  # 最小连接数
        max_size=50,  # 最大连接数
        max_queries=50000,  # 每个连接最大查询数
        max_inactive_connection_lifetime=300  # 空闲连接超时
    )
    yield
    # 关闭时清理连接池
    await pool.close()

app = FastAPI(lifespan=lifespan)

@app.get("/query")
async def query_database():
    async with pool.acquire() as connection:
        result = await connection.fetch("SELECT * FROM table")
    return result

性能优化技巧

1. 响应压缩

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)

2. 请求限流

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api")
@limiter.limit("10/minute")
async def limited_endpoint(request: Request):
    return {"message": "OK"}

3. 缓存策略

from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis

@app.on_event("startup")
async def startup():
    redis_client = redis.from_url("redis://localhost")
    FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache")

@app.get("/cached")
@cache(expire=60)  # 缓存60秒
async def cached_endpoint():
    # 耗时操作
    result = expensive_operation()
    return result

4. 数据库查询优化

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

# 使用selectinload避免N+1查询
async def get_users_with_posts(db: AsyncSession):
    result = await db.execute(
        select(User)
        .options(selectinload(User.posts))
    )
    return result.scalars().all()

部署配置

Docker部署

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 使用Gunicorn + Uvicorn workers
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

Gunicorn配置

# gunicorn_config.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 10000
max_requests_jitter = 1000
timeout = 30
keepalive = 5

Nginx反向代理

upstream fastapi_backend {
    least_conn;
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    server 127.0.0.1:8003;
}

server {
    listen 80;
    server_name api.example.com;

    location / {
        proxy_pass http://fastapi_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # WebSocket支持
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

监控和日志

结构化日志

import logging
import json
from pythonjsonlogger import jsonlogger

# 配置JSON日志
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)

logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    
    logger.info({
        "method": request.method,
        "path": request.url.path,
        "status_code": response.status_code,
        "process_time": process_time
    })
    
    return response

Prometheus指标

from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

@app.get("/metrics")
async def metrics():
    return Response(content=generate_latest(), media_type="text/plain")

错误处理

from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError

@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.detail,
            "path": request.url.path
        }
    )

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=422,
        content={
            "error": "Validation error",
            "details": exc.errors()
        }
    )

性能测试结果

在4核8GB服务器上的压测结果:

  • QPS: 5000+
  • P50延迟: 15ms
  • P99延迟: 80ms
  • 并发连接: 1000+

最佳实践

  1. 使用异步I/O:充分利用FastAPI的异步特性
  2. 连接池管理:合理配置数据库连接池
  3. 监控告警:建立完善的监控体系
  4. 错误处理:统一的错误处理机制
  5. 文档完善:利用自动生成的API文档

总结

FastAPI是一个优秀的现代Web框架,通过合理的架构设计和优化配置,可以构建高性能的生产级API服务。关键是要充分利用异步特性,优化I/O操作,并建立完善的监控体系。

希望这些经验对正在部署FastAPI服务的开发者有所帮助!