Python(FastAPI) 路由匹配中使用ORM实现查询功能
·


从建表到路由到查询代码
from fastapi import FastAPI,Depends
from datetime import datetime
from sqlalchemy import String, DateTime, func,select
from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
app = FastAPI()
# 1.创建异步引擎
#ASYNC_DATAASE_URL为数据库的地址
ASYNC_DATAASE_URL = "mysql+aiomysql://root:630229@localhost:3306/fast_api_db?charset=utf8"
async_engine = create_async_engine(
ASYNC_DATAASE_URL,
echo=True, # 可选删除SQL日志
pool_size=10, #设置连接池活跃的连接数
max_overflow=20, # 允许额外的连接数
)
# 2.定义模型类: 基类+表对应的模型类
# 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社
class Base(DeclarativeBase):
create_time: Mapped[datetime] = mapped_column(
DateTime,
insert_default=func.now(),
default=datetime.now,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime,
insert_default=func.now(),
onupdate=func.now(),
default=datetime.now,
comment="修改时间"
)
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id")
bookname: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(comment="价格")
publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
# 3.启动应用时建表,FASTAPI启动时调用建表的函数
async def create_tables():
#创建异步引擎,创建事务建表
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) #模型类的元数据
@app.on_event("startup")
async def startup_event():
await create_tables()
@app.get("/")
async def root():
return {"message": "Hello World"}
# 路由匹配中使用 ORM ,创建依赖项,使用 Depends注入到处理函数
# 需求:查询功能的接口,查询图书=》依赖注入:创建依赖项获取数据库会话+Depends注入路由处理函数
# 4.创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, # 绑定异步引擎
class_=AsyncSession, # 设置会话类为异步会话
expire_on_commit=False # 设置会话提交时是否自动刷新
)
# 5.依赖项:获取数据库会话
async def get_database():
async with AsyncSessionLocal() as session:
try:
yield session # 返回数据库会话
await session.commit() # 提交事务
except Exception:
await session.rollback() # 回滚事务
raise
finally:
await session.close() # 关闭会话
# 6.路由:查询图书
@app.get("/book/books")
async def get_books_list(session: AsyncSession = Depends(get_database)):
result = await session.execute(select(Book))
books = result.scalars().all()
return books
完美
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)