Python安全编程实践
Python安全编程实践
一、输入验证与清洗
永远不要信任外部输入。所有来自用户、文件、网络的数据都需要验证。
import re
from typing import Optional
class InputValidator:
@staticmethod
def sanitize_string(value: str, max_length: int = 255) -> str:
"""清洗字符串输入"""
if not isinstance(value, str):
raise ValueError("输入必须是字符串")
# 去除首尾空白
value = value.strip()
# 限制长度
if len(value) > max_length:
raise ValueError(f"输入长度不能超过{max_length}")
# 移除控制字符
value = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', value)
return value
@staticmethod
def validate_email(email: str) -> str:
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
email = email.strip().lower()
if not re.match(pattern, email):
raise ValueError("邮箱格式不正确")
return email
@staticmethod
def validate_url(url: str, allowed_schemes=('http', 'https')) -> str:
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.scheme not in allowed_schemes:
raise ValueError(f"不允许的URL协议: {parsed.scheme}")
if not parsed.netloc:
raise ValueError("URL缺少主机名")
return url
@staticmethod
def validate_path(path: str, base_dir: str) -> str:
"""防止路径遍历攻击"""
import os
# 解析为绝对路径
abs_path = os.path.abspath(os.path.join(base_dir, path))
# 确保在允许的目录内
if not abs_path.startswith(os.path.abspath(base_dir)):
raise ValueError("路径遍历攻击检测")
return abs_path
二、SQL注入防护
import sqlite3
# 错误:字符串拼接(易受SQL注入)
def get_user_unsafe(username):
query = f"SELECT * FROM users WHERE name = '{username}'"
# 输入: ' OR '1'='1 会返回所有用户
cursor.execute(query)
# 正确:参数化查询
def get_user_safe(username):
cursor.execute("SELECT * FROM users WHERE name = ?", (username,))
return cursor.fetchone()
# SQLAlchemy ORM(自动参数化)
def get_user_orm(username):
return session.query(User).filter(User.name == username).first()
# 需要动态构建查询时的安全方式
def search_users(filters: dict):
conditions = []
params = []
allowed_fields = {'name', 'email', 'age', 'city'}
for field, value in filters.items():
if field not in allowed_fields:
raise ValueError(f"不允许的查询字段: {field}")
conditions.append(f"{field} = ?")
params.append(value)
query = "SELECT * FROM users"
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
return cursor.fetchall()
三、XSS防护
import html
from markupsafe import Markup, escape
# HTML转义
def safe_output(user_input):
"""转义HTML特殊字符"""
return html.escape(user_input)
# 输入:
# 输出: <script>alert('xss')</script>
# 使用markupsafe(Flask/Jinja2内置)
safe = escape("")
# 允许部分HTML的安全清洗
import bleach
def sanitize_html(content):
"""只允许安全的HTML标签"""
allowed_tags = ['p', 'br', 'strong', 'em', 'a', 'ul', 'ol', 'li']
allowed_attrs = {'a': ['href', 'title']}
return bleach.clean(
content,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
# Content-Security-Policy头
def add_security_headers(response):
response.headers['Content-Security-Policy'] = "default-src 'self'"
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
四、密码安全
import hashlib
import secrets
import bcrypt
# 错误:明文存储或简单哈希
def hash_password_unsafe(password):
return hashlib.md5(password.encode()).hexdigest() # 不安全!
# 正确:使用bcrypt
def hash_password(password: str) -> str:
"""安全地哈希密码"""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
# 生成安全的随机令牌
def generate_token(length=32) -> str:
return secrets.token_urlsafe(length)
def generate_api_key() -> str:
return secrets.token_hex(32)
# 安全的密码策略验证
def validate_password_strength(password: str) -> tuple[bool, list[str]]:
errors = []
if len(password) < 8:
errors.append("密码至少8个字符")
if not re.search(r'[A-Z]', password):
errors.append("需要至少一个大写字母")
if not re.search(r'[a-z]', password):
errors.append("需要至少一个小写字母")
if not re.search(r'\d', password):
errors.append("需要至少一个数字")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("需要至少一个特殊字符")
return len(errors) == 0, errors
五、加密与签名
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import hmac
import hashlib
# 对称加密(Fernet)
class SymmetricEncryption:
def __init__(self, key=None):
self.key = key or Fernet.generate_key()
self.cipher = Fernet(self.key)
def encrypt(self, data: str) -> str:
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, token: str) -> str:
return self.cipher.decrypt(token.encode()).decode()
# HMAC签名
def sign_message(message: str, secret: str) -> str:
"""生成消息签名"""
return hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
def verify_signature(message: str, signature: str, secret: str) -> bool:
"""验证消息签名(使用恒定时间比较防止时序攻击)"""
expected = sign_message(message, secret)
return hmac.compare_digest(signature, expected)
# JWT令牌
import jwt
from datetime import datetime, timedelta
class TokenManager:
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, payload: dict, expires_hours=24) -> str:
data = payload.copy()
data['exp'] = datetime.utcnow() + timedelta(hours=expires_hours)
data['iat'] = datetime.utcnow()
return jwt.encode(data, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[dict]:
try:
return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
except jwt.ExpiredSignatureError:
raise ValueError("令牌已过期")
except jwt.InvalidTokenError:
raise ValueError("无效的令牌")
六、安全的文件操作
import os
import tempfile
class SecureFileHandler:
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.csv'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
@classmethod
def validate_upload(cls, filename: str, file_size: int) -> str:
"""验证上传文件"""
# 检查文件扩展名
ext = os.path.splitext(filename)[1].lower()
if ext not in cls.ALLOWED_EXTENSIONS:
raise ValueError(f"不允许的文件类型: {ext}")
# 检查文件大小
if file_size > cls.MAX_FILE_SIZE:
raise ValueError(f"文件大小超过限制: {cls.MAX_FILE_SIZE} bytes")
# 生成安全的文件名(避免路径遍历)
safe_name = secrets.token_hex(16) + ext
return safe_name
@staticmethod
def safe_write(filepath: str, content: bytes):
"""原子写入(防止写入中断导致文件损坏)"""
dir_name = os.path.dirname(filepath)
fd, tmp_path = tempfile.mkstemp(dir=dir_name)
try:
os.write(fd, content)
os.fsync(fd)
os.close(fd)
os.replace(tmp_path, filepath) # 原子操作
except Exception:
os.close(fd)
os.unlink(tmp_path)
raise
七、防止命令注入
import subprocess
import shlex
# 错误:使用shell=True和字符串拼接
def run_command_unsafe(user_input):
os.system(f"echo {user_input}") # 危险!
# 输入: "; rm -rf /" 会执行删除命令
# 正确:使用列表参数,不使用shell
def run_command_safe(filename):
"""安全地执行外部命令"""
# 验证输入
if not re.match(r'^[\w\-. ]+$', filename):
raise ValueError("文件名包含非法字符")
result = subprocess.run(
['cat', filename], # 列表形式,不经过shell解析
capture_output=True,
text=True,
timeout=30,
check=True
)
return result.stdout
# 如果必须使用shell,使用shlex.quote转义
def run_with_shell(user_input):
safe_input = shlex.quote(user_input)
subprocess.run(f"echo {safe_input}", shell=True)
八、敏感数据处理
import os
from dataclasses import dataclass, field
# 环境变量管理敏感配置
class SecureConfig:
def __init__(self):
self.db_password = os.environ.get('DB_PASSWORD')
self.api_key = os.environ.get('API_KEY')
self.secret_key = os.environ.get('SECRET_KEY')
if not all([self.db_password, self.api_key, self.secret_key]):
raise EnvironmentError("缺少必要的环境变量")
def __repr__(self):
"""防止敏感信息泄露到日志"""
return "SecureConfig(***)"
# 敏感字段不序列化
@dataclass
class User:
name: str
email: str
password_hash: str = field(repr=False) # 不出现在repr中
def to_dict(self):
"""排除敏感字段"""
return {'name': self.name, 'email': self.email}
# 安全的日志记录
import logging
class SensitiveFilter(logging.Filter):
SENSITIVE_PATTERNS = [
(re.compile(r'password["\s:=]+["\']?(\S+)'), 'password=***'),
(re.compile(r'token["\s:=]+["\']?(\S+)'), 'token=***'),
(re.compile(r'api[_-]?key["\s:=]+["\']?(\S+)'), 'api_key=***'),
]
def filter(self, record):
msg = record.getMessage()
for pattern, replacement in self.SENSITIVE_PATTERNS:
msg = pattern.sub(replacement, msg)
record.msg = msg
record.args = ()
return True
九、速率限制
import time
from collections import defaultdict
class RateLimiter:
"""滑动窗口速率限制器"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - self.window_seconds
# 清理过期记录
self.requests[key] = [
t for t in self.requests[key] if t > window_start
]
if len(self.requests[key]) >= self.max_requests:
return False
self.requests[key].append(now)
return True
def get_retry_after(self, key: str) -> float:
if not self.requests[key]:
return 0
oldest = min(self.requests[key])
return max(0, oldest + self.window_seconds - time.time())
# 使用
limiter = RateLimiter(max_requests=100, window_seconds=60)
def handle_request(client_ip, request):
if not limiter.is_allowed(client_ip):
retry_after = limiter.get_retry_after(client_ip)
return {'error': '请求过于频繁', 'retry_after': retry_after}, 429
return process_request(request)
十、安全检查清单
1. 输入验证:所有外部输入都要验证和清洗
2. 参数化查询:永远不要拼接SQL字符串
3. 输出编码:根据上下文对输出进行编码(HTML/URL/JS)
4. 密码存储:使用bcrypt/argon2,永远不要明文或MD5
5. 会话管理:使用安全的随机令牌,设置过期时间
6. HTTPS:生产环境强制使用HTTPS
7. 最小权限:代码和服务只拥有必要的权限
8. 依赖安全:定期更新依赖,使用safety/pip-audit检查漏洞
9. 错误处理:不要向用户暴露内部错误详情
10. 日志安全:不要记录敏感信息(密码、令牌、个人数据)
# 依赖安全检查
# pip install pip-audit
# pip-audit
# pip install safety
# safety check
总结:安全编程不是事后补救,而是开发过程中的基本实践。遵循"永远不信任输入、最小权限、纵深防御"的原则,可以有效防止大多数常见安全漏洞。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)