找回密码
 立即注册
首页 群组 IT互联网 网站 程序园子 FastAPI与SQLAlchemy数据库集成与CRUD操作

FastAPI与SQLAlchemy数据库集成与CRUD操作

管水芸 2025-5-29 18:16:51
title: FastAPI与SQLAlchemy数据库集成与CRUD操作
date: 2025/04/16 09:50:57
updated: 2025/04/16 09:50:57
author:  cmdragon
excerpt:
FastAPI与SQLAlchemy集成基础包括环境准备、数据库连接配置和模型定义。CRUD操作通过数据访问层封装和路由层实现,确保线程安全和事务管理。常见错误如422请求验证错误通过Pydantic模型和中间件处理。Session生命周期管理依赖注入系统保证每个请求独立会话。常见报错如数据库连接失败和事务回滚通过检查服务状态、验证连接参数和异常处理解决。
categories:

  • 后端开发
  • FastAPI
tags:

  • FastAPI
  • SQLAlchemy
  • 数据库集成
  • CRUD操作
  • Session管理
  • 错误处理
  • MySQL
1.jpeg
扫描二维码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
探索数千个预构建的 AI 应用,开启你的下一个伟大创意
1. FastAPI 与 SQLAlchemy 同步数据库集成基础

1.1 环境准备与安装

首先创建虚拟环境并安装必要依赖:
  1. python -m venv venv
  2. source venv/bin/activate  # Linux/Mac
  3. venv\Scripts\activate.bat  # Windows
  4. pip install fastapi uvicorn sqlalchemy pymysql
复制代码
1.2 数据库连接配置

在database.py中配置核心数据库连接:
  1. from sqlalchemy import create_engine
  2. from sqlalchemy.orm import sessionmaker
  3. SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@localhost/mydatabase"
  4. engine = create_engine(
  5.     SQLALCHEMY_DATABASE_URL,
  6.     pool_size=20,
  7.     max_overflow=0,
  8.     pool_pre_ping=True
  9. )
  10. SessionLocal = sessionmaker(
  11.     autocommit=False,
  12.     autoflush=False,
  13.     bind=engine,
  14.     expire_on_commit=False
  15. )
复制代码
1.3 模型定义与关系映射

在models.py中定义数据模型:
  1. from sqlalchemy import Column, Integer, String
  2. from database import Base
  3. class User(Base):
  4.     __tablename__ = "users"
  5.     id = Column(Integer, primary_key=True, index=True)
  6.     name = Column(String(50), nullable=False)
  7.     email = Column(String(100), unique=True)
  8.     age = Column(Integer, default=18)
  9.     def __repr__(self):
  10.         return f"<User(name='{self.name}', email='{self.email}')>"
复制代码
2. CRUD 操作标准实现模式

2.1 数据访问层封装

创建repository.py实现CRUD操作:
  1. from sqlalchemy.orm import Session
  2. from models import User
  3. class UserRepository:
  4.     @staticmethod
  5.     def create_user(db: Session, user_data: dict):
  6.         """ 创建用户 """
  7.         db_user = User(**user_data)
  8.         db.add(db_user)
  9.         db.commit()
  10.         db.refresh(db_user)
  11.         return db_user
  12.     @staticmethod
  13.     def get_user(db: Session, user_id: int):
  14.         """ 根据ID获取用户 """
  15.         return db.query(User).filter(User.id == user_id).first()
  16.     @staticmethod
  17.     def update_user(db: Session, user_id: int, update_data: dict):
  18.         """ 更新用户信息 """
  19.         db_user = db.query(User).filter(User.id == user_id).first()
  20.         if db_user:
  21.             for key, value in update_data.items():
  22.                 setattr(db_user, key, value)
  23.             db.commit()
  24.             db.refresh(db_user)
  25.         return db_user
  26.     @staticmethod
  27.     def delete_user(db: Session, user_id: int):
  28.         """ 删除用户 """
  29.         db_user = db.query(User).filter(User.id == user_id).first()
  30.         if db_user:
  31.             db.delete(db_user)
  32.             db.commit()
  33.             return True
  34.         return False
复制代码
2.2 路由层实现

在main.py中定义API端点:
  1. from fastapi import Depends, FastAPI, HTTPException
  2. from sqlalchemy.orm import Session
  3. from models import Base
  4. from database import engine, SessionLocal
  5. from repository import UserRepository
  6. from pydantic import BaseModel
  7. Base.metadata.create_all(bind=engine)
  8. app = FastAPI()
  9. # 依赖注入获取数据库会话
  10. def get_db():
  11.     db = SessionLocal()
  12.     try:
  13.         yield db
  14.     finally:
  15.         db.close()
  16. class UserCreate(BaseModel):
  17.     name: str
  18.     email: str
  19.     age: int = 18
  20. @app.post("/users/")
  21. def create_user(user: UserCreate, db: Session = Depends(get_db)):
  22.     return UserRepository.create_user(db, user.dict())
  23. @app.get("/users/{user_id}")
  24. def read_user(user_id: int, db: Session = Depends(get_db)):
  25.     db_user = UserRepository.get_user(db, user_id)
  26.     if db_user is None:
  27.         raise HTTPException(status_code=404, detail="User not found")
  28.     return db_user
复制代码
3. Session 生命周期管理

3.1 Session 线程安全策略

通过依赖注入系统保证每个请求独立会话:
  1. def get_db():
  2.     db = SessionLocal()
  3.     try:
  4.         yield db
  5.     finally:
  6.         db.close()
复制代码
3.2 事务管理最佳实践
  1. def transfer_funds(db: Session, from_id: int, to_id: int, amount: float):
  2.     try:
  3.         from_user = UserRepository.get_user(db, from_id)
  4.         to_user = UserRepository.get_user(db, to_id)
  5.         if from_user.balance < amount:
  6.             raise ValueError("Insufficient funds")
  7.         from_user.balance -= amount
  8.         to_user.balance += amount
  9.         db.commit()
  10.     except Exception as e:
  11.         db.rollback()
  12.         raise e
复制代码
4. 常见错误处理

4.1 422 请求验证错误

示例场景
  1. {
  2.   "detail": [
  3.     {
  4.       "loc": [
  5.         "body",
  6.         "age"
  7.       ],
  8.       "msg": "value is not a valid integer",
  9.       "type": "type_error.integer"
  10.     }
  11.   ]
  12. }
复制代码
解决方案

  • 检查请求体是否匹配Pydantic模型定义
  • 使用OpenAPI文档进行测试
  • 添加中间件捕获验证错误:
  1. from fastapi.exceptions import RequestValidationError
  2. @app.exception_handler(RequestValidationError)
  3. async def validation_exception_handler(request, exc):
  4.     return JSONResponse(
  5.         status_code=400,
  6.         content={"detail": exc.errors(), "body": exc.body},
  7.     )
复制代码
课后Quiz

问题1:以下哪种方式可以有效防止SQL注入攻击?
A) 使用字符串拼接SQL语句
B) 使用ORM的查询参数化功能
C) 在数据库连接字符串添加特殊参数
D) 禁用所有输入验证
答案:B) 正确。SQLAlchemy等ORM框架会自动进行参数化查询,将用户输入作为参数传递而不是直接拼接到SQL语句中。
问题2:为什么需要在finally块中关闭数据库会话?
A) 为了提升查询性能
B) 确保会话在任何情况下都会被正确关闭
C) 防止其他请求使用该会话
D) 满足数据库连接池的要求
答案:B) 正确。无论是否发生异常,finally块中的代码都会执行,保证会话资源的正确释放。
常见报错解决方案

**报错1
**:sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'localhost'")
原因分析

  • 数据库服务未启动
  • 连接参数(用户名/密码/端口)错误
  • 网络防火墙阻止连接
解决方案

  • 检查MySQL服务状态
  • 验证连接字符串参数
  • 使用telnet测试端口连通性
  • 添加连接超时参数:
  1. create_engine(connect_args={"connect_timeout": 10})
复制代码
**报错2
**:sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush.
原因分析

  • 数据库操作违反约束(如唯一性约束)
  • 事务未正确处理异常
解决方案

  • 检查数据完整性约束
  • 在事务代码块中添加try/except
  • 执行显式回滚操作
  • 使用session.expire_all()重置会话状态
余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长,阅读完整的文章:FastAPI与SQLAlchemy数据库集成与CRUD操作 | cmdragon's Blog
往期文章归档:

<ul>FastAPI与SQLAlchemy同步数据库集成 | cmdragon's Blog
SQLAlchemy 核心概念与同步引擎配置详解 | cmdragon's Blog
FastAPI依赖注入性能优化策略 | cmdragon's Blog
FastAPI安全认证中的依赖组合 | cmdragon's Blog
FastAPI依赖注入系统及调试技巧 | cmdragon's Blog
FastAPI依赖覆盖与测试环境模拟 | cmdragon's Blog
FastAPI中的依赖注入与数据库事务管理 | cmdragon's Blog
FastAPI依赖注入实践:工厂模式与实例复用的优化策略 | cmdragon's Blog
FastAPI依赖注入:链式调用与多级参数传递 | cmdragon's Blog
FastAPI依赖注入:从基础概念到应用 | cmdragon's Blog
FastAPI中实现动态条件必填字段的实践 | cmdragon's Blog
FastAPI中Pydantic异步分布式唯一性校验 | cmdragon's Blog
掌握FastAPI与Pydantic的跨字段验证技巧 | cmdragon's Blog
FastAPI中的Pydantic密码验证机制与实现 | cmdragon's Blog
深入掌握FastAPI与OpenAPI规范的高级适配技巧 | cmdragon's Blog
Pydantic字段元数据指南:从基础到企业级文档增强 | cmdragon's Blog
Pydantic Schema生成指南:自定义JSON Schema | cmdragon's Blog
Pydantic递归模型深度校验36计:从无限嵌套到亿级数据的优化法则 | cmdragon's Blog
Pydantic异步校验器深:构建高并发验证系统 | cmdragon's Blog
Pydantic根校验器:构建跨字段验证系统 | cmdragon's Blog
Pydantic配置继承抽象基类模式 | cmdragon's Blog
Pydantic多态模型:用鉴别器构建类型安全的API接口 | cmdragon's Blog
FastAPI性能优化指南:参数解析与惰性加载 | cmdragon's Blog
FastAPI依赖注入:参数共享与逻辑复用 | cmdragon's Blog
FastAPI安全防护指南:构建坚不可摧的参数处理体系 | cmdragon's Blog
FastAPI复杂查询终极指南:告别if-else的现代化过滤架构 | cmdragon's Blog
FastAPI 核心机制:分页参数的实现与最佳实践 | cmdragon's Blog
FastAPI 错误处理与自定义错误消息完全指南:构建健壮的 API 应用
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!