从API到数据库,手把手教你打造一个能干活、能背锅的Agent
从API到数据库,手把手教你打造一个能干活、能背锅的Agent
为什么Agent需要学这些?
你养的AI如果只会聊闲天,那它就是个“嘴炮王者”。真让它帮你干活——查订单、发消息、记用户偏好……它立马变人工智障。
别慌!今天我们就给Agent装上三件套:API遥控器(RESTful/GraphQL)、实时通知系统(Webhook/SSE)、记忆仓库(SQL+Redis)。学完之后,你的Agent能从“键盘侠”进化成“全能打工人”。
准备好了吗?系好安全带,老司机要飙车了!
一、RESTful API 与 GraphQL 集成——让Agent学会“点外卖”和“收快递”
1.1 RESTful API:点套餐的快递小哥
通俗类比
-
GET:你打电话问快递员“我的包裹到哪了?”(只查不修改)
-
POST:你寄一个新包裹(创建资源)
-
PUT:你改收件地址(整体更新)
-
DELETE:你退货(删除资源)
-
路径参数:快递单号
https://api.com/order/123456 -
查询参数:筛选条件
?page=1&size=10 -
请求体:包裹里的东西(JSON数据)
-
Token认证:小区门禁卡,没有卡连门都进不去。
实战场景:电商商品查询 + 创建订单
代码示例1:GET请求(查商品列表 + 商品详情)
# -*- coding: utf-8 -*-
"""
场景:电商API对接 - 查询商品
API提供商:这里使用免费的FakeStore API(https://fakestoreapi.com)
不用注册,直接调,良心!
"""
import requests # 专门发HTTP请求的库,相当于快递员的小三轮
# ------------------- 1. 查询商品列表(带查询参数) -------------------
def get_products(category=None, limit=10):
"""
获取商品列表,支持按分类筛选和数量限制
:param category: 分类名,如 "electronics", "jewelry"
:param limit: 最多返回多少个商品
"""
base_url = "https://fakestoreapi.com/products"
# 构造查询参数:相当于在快递单上写“只要电子产品,只要10件”
params = {}
if category:
# 如果指定分类,API路径变成 /products/category/xxx
url = f"{base_url}/category/{category}"
else:
url = base_url
params["limit"] = limit # 查询参数加到问号后面
# 发送GET请求:headers可以加User-Agent,有些API会检查
headers = {"User-Agent": "MyAgent/1.0"} # 假装自己是浏览器
try:
resp = requests.get(url, params=params, headers=headers, timeout=10)
# timeout=10 表示最多等10秒,防止卡死(就像快递小哥等太久就不等了)
# 检查HTTP状态码:200表示成功,404表示没找到,500表示服务器炸了
if resp.status_code == 200:
# .json() 自动把返回的JSON字符串解析成Python列表/字典
products = resp.json()
print(f"✅ 查到了 {len(products)} 个商品")
return products
else:
print(f"❌ 请求失败,状态码:{resp.status_code}")
return []
except requests.exceptions.Timeout:
print("⏰ 请求超时,快递小哥可能堵车了")
return []
except Exception as e:
print(f"💥 未知错误:{e}")
return []
# ------------------- 2. 查询单个商品(路径参数) -------------------
def get_product_detail(product_id):
"""路径参数直接拼在URL里,就像快递单号"""
url = f"https://fakestoreapi.com/products/{product_id}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
product = resp.json()
print(f"商品名:{product['title']}")
print(f"价格:${product['price']}")
return product
else:
print(f"商品{product_id}不存在或已下架")
return None
# ------------------- 3. 创建订单(POST请求 + 请求体 + Token认证) -------------------
def create_order(user_token, product_id, quantity):
"""
模拟创建订单,需要用户Token(就像门禁卡)
真实场景中Token从登录接口获得,放在Header的Authorization里
"""
url = "https://fakestoreapi.com/carts" # 这是个假接口,仅演示结构
# 请求体:要寄的“包裹”内容,格式通常是JSON
order_data = {
"userId": 1, # 假设用户ID
"date": "2025-04-16",
"products": [{"productId": product_id, "quantity": quantity}]
}
# 请求头:告诉服务器我发的是JSON,并且带上Token
headers = {
"Content-Type": "application/json", # 告诉服务器,我给你的是JSON格式
"Authorization": f"Bearer {user_token}" # Token认证标准写法
}
resp = requests.post(url, json=order_data, headers=headers, timeout=10)
# 注意:用 json= 参数会自动序列化字典并设置Content-Type,比手动data=json.dumps()方便
if resp.status_code == 201: # 201表示资源创建成功
print("🎉 订单创建成功!订单号:", resp.json().get("id"))
return resp.json()
else:
print(f"❌ 创建失败:{resp.status_code} - {resp.text}")
return None
# ------------------- 运行示例 -------------------
if __name__ == "__main__":
# 先查商品列表
products = get_products(category="electronics", limit=3)
if products:
first_id = products[0]["id"]
# 再查详情
get_product_detail(first_id)
# 模拟Token(实际应从登录获取)
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
create_order(fake_token, product_id=1, quantity=2)
避坑指南:
-
路径参数和查询参数别搞混:
/products/123是路径参数;/products?category=shoes是查询参数。 -
Token不要写死在代码里!应该从配置或登录响应中获取。
-
生产环境必须用环境变量存Token,千万别提交到Git。
1.2 GraphQL:点外卖只点自己爱吃的
类比:RESTful是套餐(服务器给你一堆字段,哪怕你只要一个名字);GraphQL是自助餐(你想吃啥就拿啥,只拿需要的,绝不浪费)。
核心原理:客户端发一个查询字符串(描述想要哪些字段),服务器返回精确的JSON。
对比:
-
RESTful:多个端点(
/users/123、/users/123/posts)→ 可能需要多次请求。 -
GraphQL:一个端点(
/graphql) → 一次请求拿所有想要的数据。
代码示例2:调用GraphQL API(查询用户信息 + 文章列表)
我们使用GitHub的GraphQL API(需要Token,但可以申请免费)。另一个示例用公共的“国家信息”API。
# -*- coding: utf-8 -*-
"""
场景1:使用GraphQL查询国家信息(公开API,无需Token)
API地址:https://countries.trevorblades.com
"""
import requests
def graphql_query_countries(continent_code="AS"):
"""
查询亚洲国家列表,只返回国家名称和首都
GraphQL查询像点菜:你明确写出要哪些字段
"""
url = "https://countries.trevorblades.com/"
# GraphQL查询字符串:花括号里就是你想要的“菜”
query = """
{
continent(code: "%s") {
name
countries {
name
capital
}
}
}
""" % continent_code
# 标准的GraphQL请求体格式
payload = {
"query": query,
"variables": None # 变量(如果有)可以放这里
}
headers = {"Content-Type": "application/json"}
resp = requests.post(url, json=payload, headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
# 结构:data.continent.countries
countries = data["data"]["continent"]["countries"]
for c in countries[:5]:
print(f"国家:{c['name']},首都:{c.get('capital', '无')}")
return data
else:
print(f"GraphQL查询失败:{resp.text}")
return None
# 场景2:GitHub GraphQL示例(需Token,这里演示结构)
def graphql_github_user(username, token):
"""
查询GitHub用户信息,只拿登录名、仓库名和星标数
需要GitHub Personal Access Token(去Settings->Developer settings生成)
"""
url = "https://api.github.com/graphql"
query = """
query($login: String!) {
user(login: $login) {
login
name
repositories(first: 5) {
nodes {
name
stargazerCount
}
}
}
}
"""
variables = {"login": username}
payload = {"query": query, "variables": variables}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
resp = requests.post(url, json=payload, headers=headers, timeout=15)
if resp.status_code == 200:
user_data = resp.json()["data"]["user"]
print(f"用户:{user_data['login']},姓名:{user_data.get('name')}")
for repo in user_data["repositories"]["nodes"]:
print(f"仓库:{repo['name']},⭐ {repo['stargazerCount']}")
return user_data
else:
print(f"错误:{resp.status_code} - {resp.text}")
return None
if __name__ == "__main__":
# 测试1:国家信息
graphql_query_countries("AS") # AS=亚洲
# 测试2:GitHub(需要真实token,这里仅演示代码结构)
# graphql_github_user("octocat", "ghp_your_token_here")
GraphQL的优点:一次请求拿多个资源,减少网络往返;字段按需索取,省流量。
缺点:缓存复杂(不像RESTful每个URL可以单独缓存),查询可能很深导致性能问题。
什么时候用GraphQL?
-
移动端网络环境差,需要减少请求次数。
-
前端需要灵活控制字段(比如不同页面展示不同信息)。
-
后端有多个数据源聚合。
二、Webhook 与 SSE 实时响应——让Agent从“轮询”变“被叫”
2.1 Webhook:快递到站了,主动通知你
类比:你寄了个快递,不想一直刷物流页面。你给快递公司留了个电话,快递一到站就给你打电话——这就是Webhook。
流程:第三方系统(比如支付平台)发生事件(订单支付成功),主动向你的服务器(你提供的URL)发一个HTTP POST请求,带上事件数据。你收到后处理业务(比如更新订单状态)。
实战场景:接收支付回调、接收电商订单状态变更。
代码示例3:实现一个Webhook接收端点(使用Flask)
# -*- coding: utf-8 -*-
"""
场景:模拟支付平台回调通知
我们用Flask写一个简单的Webhook接收服务
需要安装:pip install flask
"""
from flask import Flask, request, jsonify
import hashlib
import hmac
import json
app = Flask(__name__)
# 假设这是你与第三方约定的签名密钥(像钥匙,双方都有)
WEBHOOK_SECRET = "my_super_secret_key_123"
def verify_signature(payload_body, signature_header, secret):
"""
验证请求签名,防止伪造回调(就像快递员对暗号)
常见方法:HMAC-SHA256
"""
# 构造签名:用secret对请求体进行HMAC
computed = hmac.new(
secret.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
# 比较计算出的签名和头部的签名(应该用安全比较,这里简单演示)
return hmac.compare_digest(computed, signature_header)
@app.route('/webhook/payment', methods=['POST'])
def payment_webhook():
"""
支付结果回调接口
第三方会POST到这里,携带订单信息和签名
"""
# 1. 获取原始请求体(字节串)—— 签名计算需要原始数据
raw_body = request.get_data()
# 2. 获取签名头(不同平台头名称不同,常见 X-Signature)
signature = request.headers.get('X-Signature', '')
if not signature:
return jsonify({"error": "Missing signature"}), 401
# 3. 验证签名
if not verify_signature(raw_body, signature, WEBHOOK_SECRET):
print("❌ 签名验证失败,可能是伪造请求")
return jsonify({"error": "Invalid signature"}), 403
# 4. 解析JSON数据
try:
data = json.loads(raw_body)
except:
return jsonify({"error": "Invalid JSON"}), 400
# 5. 处理业务逻辑(例如更新订单状态)
order_id = data.get('order_id')
status = data.get('status') # 'paid', 'failed'
print(f"收到支付回调:订单 {order_id} 状态 {status}")
# 这里可以调用你的Agent更新数据库、发邮件等
# 注意:回调可能重复发送,需要做幂等处理(比如检查订单状态是否已处理)
# 6. 返回成功响应(告诉第三方“我收到了”,通常返回200)
return jsonify({"code": 0, "message": "OK"}), 200
# 另一个Webhook场景:电商订单状态变更(发货通知)
@app.route('/webhook/order_shipped', methods=['POST'])
def order_shipped_webhook():
"""
接收订单发货通知
"""
raw_body = request.get_data()
# 这里签名验证逻辑类似,省略
data = json.loads(raw_body)
order_id = data.get('order_id')
tracking_number = data.get('tracking_number')
print(f"📦 订单 {order_id} 已发货,运单号:{tracking_number}")
# 可触发消息推送给用户
return "OK", 200
if __name__ == '__main__':
# 启动服务,监听在0.0.0.0:5000,外网可访问(需要内网穿透工具如ngrok)
app.run(host='0.0.0.0', port=5000, debug=True)
避坑:
-
签名验证必须做!否则任何人都可以伪造回调搞乱你的系统。
-
幂等性:同一个回调可能发多次(网络抖动),你的处理逻辑要能重复执行而不出错(比如先查订单状态,已处理就忽略)。
-
异步处理:回调里不要做耗时操作,应该立即返回200,然后丢到消息队列慢慢处理,否则第三方会超时重试。
-
内网穿透:本地开发时,第三方无法访问你的localhost,需要用ngrok、frp等工具暴露公网URL。
2.2 SSE:实时弹幕,主播主动推
类比:你在直播间,主播说话,你不需要一直刷新页面,主播一开口你就能听到——这就是Server-Sent Events(SSE)。
对比WebSocket:WebSocket是双向高速通道(像视频通话),SSE是单向广播(像收音机)。如果只是服务器推送(比如实时股价、通知),SSE更简单。
实战场景:Agent实时接收后台任务进度、新订单提醒。
代码示例4:SSE服务器(Flask) + 客户端(HTML/JS)
# -*- coding: utf-8 -*-
"""
SSE服务器端:实时推送天气更新
使用Flask,需要安装 flask 和 flask_cors(跨域)
"""
from flask import Flask, Response, request
from flask_cors import CORS
import time
import json
import random
app = Flask(__name__)
CORS(app) # 允许跨域,方便前端测试
def event_stream():
"""生成SSE事件流,这是一个生成器函数,yield每个事件"""
while True:
# 模拟实时数据:比如每2秒推送一条新订单通知
time.sleep(2)
# 构造一个随机订单消息
order = {
"order_id": random.randint(1000, 9999),
"amount": round(random.uniform(10, 500), 2),
"status": "new"
}
# SSE格式:data: 后面跟JSON字符串,以两个换行结束
yield f"data: {json.dumps(order)}\n\n"
@app.route('/stream/orders')
def stream_orders():
"""SSE推送端点"""
# 设置响应头,告诉浏览器这是SSE流
return Response(
event_stream(),
mimetype="text/event-stream", # 必须的类型
headers={
"Cache-Control": "no-cache", # 禁止缓存
"X-Accel-Buffering": "no" # 防止nginx缓冲
}
)
# 另一个场景:实时推送Agent思考过程
def agent_thought_stream():
"""模拟Agent一步步思考并推送给前端"""
steps = ["正在拆解问题...", "查询天气API...", "生成总结...", "写入文件...", "完成!"]
for step in steps:
time.sleep(1.5)
yield f"data: {json.dumps({'step': step})}\n\n"
@app.route('/stream/agent-thought')
def stream_agent_thought():
return Response(agent_thought_stream(), mimetype="text/event-stream")
if __name__ == '__main__':
app.run(port=5000, debug=True)
客户端HTML代码(保存为sse_client.html,双击用浏览器打开):
<!DOCTYPE html>
<html>
<head><title>SSE实时订单</title></head>
<body>
<h2>📦 实时新订单推送</h2>
<ul id="orders"></ul>
<script>
// 创建EventSource对象,指向SSE接口
const evtSource = new EventSource("http://localhost:5000/stream/orders");
// 监听消息事件
evtSource.onmessage = function(event) {
// event.data 就是服务器发来的字符串
const order = JSON.parse(event.data);
const li = document.createElement("li");
li.textContent = `订单 ${order.order_id},金额 ${order.amount} 元,状态 ${order.status}`;
document.getElementById("orders").appendChild(li);
};
// 处理错误(比如断线重连)
evtSource.onerror = function(e) {
console.log("连接出错,尝试重连...");
};
</script>
</body>
</html>
SSE vs WebSocket:
-
SSE:简单,基于HTTP,自动重连,只能服务器→客户端。适合通知、股票、日志流。
-
WebSocket:双向,持久连接,协议复杂。适合聊天、游戏。
避坑:
-
SSE最大并发连接数受限于浏览器(通常6个),生产环境可以用nginx代理。
-
如果客户端断开,服务器端的生成器函数会抛出异常,需要捕获并退出循环。
三、数据库对接——给Agent装上“仓库”和“冰箱”
3.1 SQL数据库(MySQL):大仓库,存所有正经东西
类比:数据库就像公司的大仓库,SQL就是仓库管理员的口令。
-
SELECT:你喊“把库存表里数量大于10的货拿出来” -
INSERT:你喊“存一箱矿泉水到货架C” -
UPDATE:你喊“把矿泉水价格改成2元” -
DELETE:你喊“过期商品扔掉” -
参数绑定:相当于告诉管理员“我要找名字叫‘张三’的用户”,而不是“我要找名字叫
'; DROP TABLE users;的人”,防止SQL注入(仓库管理员乱开门)。
实战场景:用户管理(增删改查) + 订单查询。
代码示例5:对接MySQL(使用pymysql)
首先安装:pip install pymysql(纯Python驱动,免编译)
# -*- coding: utf-8 -*-
"""
场景:用户管理模块(注册、登录、修改信息、删除)
数据库:MySQL(本地或云)
建表SQL(先手动执行):
CREATE DATABASE agent_demo;
USE agent_demo;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
import pymysql
from pymysql.cursors import DictCursor # 让查询结果返回字典,方便用列名访问
# 数据库连接配置(像仓库地址和钥匙)
DB_CONFIG = {
"host": "localhost", # 数据库IP
"port": 3306, # MySQL默认端口
"user": "root", # 用户名
"password": "your_password", # 密码,千万别提交到代码!
"database": "agent_demo",
"charset": "utf8mb4",
"cursorclass": DictCursor # 结果返回字典
}
def get_db_connection():
"""获取数据库连接(每次操作都新建连接,用完记得关)"""
return pymysql.connect(**DB_CONFIG)
# ------------------- 1. 插入新用户(INSERT) -------------------
def create_user(username, password_hash, email):
"""注册用户,使用参数绑定防注入"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
# 参数绑定:用 %s 占位,然后传入元组,pymysql会正确转义
sql = "INSERT INTO users (username, password_hash, email) VALUES (%s, %s, %s)"
cursor.execute(sql, (username, password_hash, email))
conn.commit() # 提交事务(存到硬盘)
print(f"✅ 用户 {username} 创建成功")
return True
except pymysql.err.IntegrityError as e:
print(f"❌ 用户名已存在或邮箱重复:{e}")
return False
finally:
conn.close()
# ------------------- 2. 查询用户(SELECT) -------------------
def get_user_by_username(username):
"""根据用户名查用户信息"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "SELECT id, username, email, created_at FROM users WHERE username = %s"
cursor.execute(sql, (username,))
user = cursor.fetchone() # 获取一条记录,返回字典
if user:
print(f"找到用户:{user['username']},邮箱:{user['email']}")
else:
print("用户不存在")
return user
finally:
conn.close()
# ------------------- 3. 更新用户邮箱(UPDATE) -------------------
def update_email(username, new_email):
"""修改用户邮箱"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "UPDATE users SET email = %s WHERE username = %s"
affected = cursor.execute(sql, (new_email, username))
conn.commit()
if affected:
print(f"✉️ 用户 {username} 邮箱已更新为 {new_email}")
else:
print("用户不存在,更新失败")
return affected > 0
finally:
conn.close()
# ------------------- 4. 删除用户(DELETE) -------------------
def delete_user(username):
"""删除用户(慎用)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
sql = "DELETE FROM users WHERE username = %s"
affected = cursor.execute(sql, (username,))
conn.commit()
if affected:
print(f"🗑️ 用户 {username} 已删除")
else:
print("用户不存在")
return affected > 0
finally:
conn.close()
# ------------------- 5. 事务示例:批量插入订单(要么全成功,要么全回滚) -------------------
def batch_create_orders(orders):
"""
orders: 列表,每个元素是 (user_id, product, amount)
模拟批量下单,其中一个失败就全部取消
"""
conn = get_db_connection()
try:
conn.begin() # 开始事务
with conn.cursor() as cursor:
for order in orders:
sql = "INSERT INTO orders (user_id, product, amount) VALUES (%s, %s, %s)"
cursor.execute(sql, order)
conn.commit() # 全部成功才提交
print("🎉 批量下单成功")
except Exception as e:
conn.rollback() # 出错就回滚,就像没发生过
print(f"💥 批量下单失败,已回滚:{e}")
finally:
conn.close()
# 运行示例
if __name__ == "__main__":
# 注意:先确保数据库和表存在
create_user("alice", "hashed_pw_123", "alice@example.com")
user = get_user_by_username("alice")
if user:
update_email("alice", "newalice@example.com")
# delete_user("alice")
避坑:
-
SQL注入:永远不要拼接字符串!用参数绑定(
%s占位)。 -
连接管理:用完必须
close(),或者用with上下文自动关。 -
事务:多个相关操作要包在事务里,保证原子性。
-
密码存储:绝对不要存明文,要用bcrypt等哈希。
3.2 NoSQL(Redis):冰箱,放经常用的东西
类比:Redis就像你家的冰箱,牛奶、鸡蛋这些天天用的东西放冰箱,不用每次去大仓库(SQL)翻。冰箱还有“过期自动扔”的功能(TTL)。
实战场景:缓存商品详情、存储用户会话、Agent短期记忆。
代码示例6:对接Redis(缓存商品数据 + 存储用户Token)
安装:pip install redis
# -*- coding: utf-8 -*-
"""
场景1:缓存数据库查询结果(避免频繁查SQL)
场景2:存储用户登录状态(Session)
需要先启动Redis:redis-server
"""
import redis
import json
import time
# 连接Redis(默认本地,端口6379)
r = redis.Redis(
host='localhost',
port=6379,
db=0, # 数据库索引,默认0-15
decode_responses=True # 自动将返回的字节串解码为字符串
)
# ------------------- 场景1:缓存商品详情 -------------------
def get_product_from_db(product_id):
"""模拟从MySQL查询商品(很慢)"""
print("🐢 查询数据库...")
time.sleep(1) # 假装慢查询
return {"id": product_id, "name": "手机", "price": 2999, "stock": 100}
def get_product_with_cache(product_id):
"""先查Redis缓存,没有再去数据库,并写入缓存"""
cache_key = f"product:{product_id}"
# 从Redis取
cached = r.get(cache_key)
if cached:
print("🎯 命中缓存,秒开!")
return json.loads(cached) # 存的是JSON字符串,要解析
else:
print("😭 缓存未命中,去数据库查")
product = get_product_from_db(product_id)
# 存入Redis,并设置过期时间(比如600秒)
r.setex(cache_key, 600, json.dumps(product))
print("📦 已写入缓存,有效期10分钟")
return product
# ------------------- 场景2:存储用户Token(会话) -------------------
def store_user_session(user_id, token, expire_seconds=3600):
"""用户登录成功,将token存入Redis,设置过期时间"""
session_key = f"session:{token}"
# 可以用Hash存储更多信息,这里简单存user_id
r.setex(session_key, expire_seconds, user_id)
print(f"🔑 用户{user_id}的会话已存入,有效期{expire_seconds}秒")
def get_user_id_by_token(token):
"""根据token获取用户ID(验证登录)"""
user_id = r.get(f"session:{token}")
if user_id:
print(f"✅ 有效token,用户ID:{user_id}")
return user_id
else:
print("❌ token无效或已过期")
return None
# ------------------- 场景3:Redis做分布式锁(防止重复处理) -------------------
def process_order(order_id):
"""模拟处理订单,用Redis锁防止并发重复处理"""
lock_key = f"lock:order:{order_id}"
# setnx(set if not exists)加锁,设置10秒自动释放(防止死锁)
acquired = r.setnx(lock_key, "locked")
if acquired:
r.expire(lock_key, 10) # 设置过期时间,防止死锁
try:
print(f"🔒 获得锁,处理订单{order_id}...")
# 实际业务逻辑
time.sleep(2)
print(f"✅ 订单{order_id}处理完成")
finally:
r.delete(lock_key) # 释放锁
print("🔓 锁已释放")
else:
print(f"⚠️ 订单{order_id}正在被其他线程处理,稍后重试")
# ------------------- 运行 -------------------
if __name__ == "__main__":
# 测试缓存
print(get_product_with_cache(1))
print(get_product_with_cache(1)) # 第二次应该命中缓存
# 测试会话
store_user_session(123, "token_abc")
get_user_id_by_token("token_abc")
get_user_id_by_token("fake_token")
# 测试锁(多线程环境下,这里简单模拟)
process_order(1001)
process_order(1001) # 第二次会失败
Redis常用数据类型:
-
String:缓存值、计数器
-
Hash:存储对象(如用户信息)
-
List:消息队列
-
Set:标签、去重
-
Sorted Set:排行榜
缓存策略:
-
过期时间:必须设置,否则内存会爆。
-
缓存更新:数据变更时要主动删除或更新缓存(比如修改商品价格后,
r.delete("product:1"))。 -
缓存穿透:查询不存在的数据,每次都穿透到DB。解决方案:缓存空值并设置短过期时间。
-
缓存击穿:热点key过期瞬间大量请求打到DB。解决方案:互斥锁(如上面的分布式锁)。
四、总结与作业
恭喜你!你的Agent现在会:
-
优雅地调用RESTful和GraphQL API(点外卖收快递)
-
接收Webhook回调并验证签名(等快递通知)
-
实时推送SSE消息(当主播)
-
把重要数据存MySQL(大仓库)
-
把热点数据放Redis(冰箱)
课后作业:
-
用RESTful API对接一个真实电商接口(比如京东联盟),实现商品搜索。
-
用GraphQL查询GitHub的issues列表,只返回标题和作者。
-
实现一个Webhook接收GitHub的push事件,自动拉取代码(假装一下)。
-
用SSE做一个实时股票价格展示页面。
-
把用户登录状态同时存MySQL和Redis,实现带缓存的认证。
最后一句:代码就像乐高,照着图纸拼出来只是第一步。试着改参数、加功能、甚至故意搞破坏,才能真正理解它怎么工作的。遇到bug别慌,那是代码在教你成长。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)