Python安全编程实践
Python安全编程实践
一、输入验证
1.1 永远不要信任用户输入
# 不安全
def get_user_age(age_str):
return int(age_str) # 可能抛出异常
# 安全
def get_user_age(age_str):
try:
age = int(age_str)
if not 0 <= age <= 150:
raise ValueError("年龄超出合理范围")
return age
except ValueError:
raise ValueError("无效的年龄格式")
1.2 使用白名单而不是黑名单
# 不安全:黑名单
def validate_username(username):
forbidden = ['admin', 'root', 'system']
if username in forbidden:
raise ValueError("用户名不允许")
return username
# 安全:白名单
import re
def validate_username(username):
# 只允许字母、数字和下划线,3-20个字符
if not re.match(r'^[a-zA-Z0-9_]{3,20}$', username):
raise ValueError("用户名格式不正确")
return username
二、SQL注入防护
2.1 使用参数化查询
import sqlite3
# 不安全:字符串拼接
def get_user_unsafe(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query) # SQL注入风险!
return cursor.fetchone()
# 安全:参数化查询
def get_user_safe(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE username = ?"
cursor.execute(query, (username,))
return cursor.fetchone()
2.2 使用ORM
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
# ORM自动处理SQL注入
def get_user(session, username):
return session.query(User).filter(User.username == username).first()
三、命令注入防护
3.1 避免使用shell=True
import subprocess
# 不安全
def ping_host_unsafe(host):
command = f"ping -c 1 {host}"
subprocess.run(command, shell=True) # 命令注入风险!
# 安全
def ping_host_safe(host):
# 验证输入
import re
if not re.match(r'^[a-zA-Z0-9.-]+$', host):
raise ValueError("无效的主机名")
# 使用列表形式,不使用shell
subprocess.run(['ping', '-c', '1', host], shell=False)
3.2 使用shlex转义
import shlex
import subprocess
def run_command_safe(user_input):
# 转义用户输入
safe_input = shlex.quote(user_input)
command = f"echo {safe_input}"
subprocess.run(command, shell=True)
四、XSS防护
4.1 转义HTML输出
import html
def display_user_input(user_input):
# 不安全
# return f"
"
# 安全:转义HTML
safe_input = html.escape(user_input)
return f"
"
4.2 使用模板引擎
from jinja2 import Template
# Jinja2自动转义
template = Template("
")
output = template.render(user_input="")
# 输出:
五、密码安全
5.1 使用bcrypt哈希密码
# 安装: pip install bcrypt
import bcrypt
def hash_password(password):
# 生成盐并哈希密码
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed
def verify_password(password, hashed):
# 验证密码
return bcrypt.checkpw(password.encode('utf-8'), hashed)
# 使用
password = "my_secure_password"
hashed = hash_password(password)
# 验证
if verify_password("my_secure_password", hashed):
print("密码正确")
5.2 密码强度验证
import re
def validate_password_strength(password):
"""
密码必须:
- 至少8个字符
- 包含大写字母
- 包含小写字母
- 包含数字
- 包含特殊字符
"""
if len(password) < 8:
return False, "密码至少8个字符"
if not re.search(r'[A-Z]', password):
return False, "密码必须包含大写字母"
if not re.search(r'[a-z]', password):
return False, "密码必须包含小写字母"
if not re.search(r'\d', password):
return False, "密码必须包含数字"
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False, "密码必须包含特殊字符"
return True, "密码强度足够"
六、敏感数据保护
6.1 不要在代码中硬编码密钥
# 不安全
API_KEY = "sk-1234567890abcdef"
# 安全:使用环境变量
import os
API_KEY = os.getenv('API_KEY')
if not API_KEY:
raise ValueError("API_KEY环境变量未设置")
6.2 加密敏感数据
# 安装: pip install cryptography
from cryptography.fernet import Fernet
# 生成密钥(只做一次,保存到安全位置)
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密
def encrypt_data(data):
return cipher.encrypt(data.encode())
# 解密
def decrypt_data(encrypted_data):
return cipher.decrypt(encrypted_data).decode()
# 使用
sensitive_data = "信用卡号: 1234-5678-9012-3456"
encrypted = encrypt_data(sensitive_data)
decrypted = decrypt_data(encrypted)
七、文件操作安全
7.1 路径遍历防护
import os
from pathlib import Path
def read_user_file_unsafe(filename):
# 不安全:可能读取任意文件
with open(filename) as f:
return f.read()
def read_user_file_safe(filename, base_dir='/var/app/uploads'):
# 安全:验证路径
base_path = Path(base_dir).resolve()
file_path = (base_path / filename).resolve()
# 确保文件在允许的目录内
if not str(file_path).startswith(str(base_path)):
raise ValueError("非法的文件路径")
with open(file_path) as f:
return f.read()
7.2 安全的文件上传
import os
import uuid
from werkzeug.utils import secure_filename
def save_uploaded_file(file, upload_dir='/var/app/uploads'):
# 验证文件类型
allowed_extensions = {'.jpg', '.png', '.pdf'}
ext = os.path.splitext(file.filename)[1].lower()
if ext not in allowed_extensions:
raise ValueError("不允许的文件类型")
# 生成安全的文件名
safe_name = secure_filename(file.filename)
unique_name = f"{uuid.uuid4()}{ext}"
# 保存文件
file_path = os.path.join(upload_dir, unique_name)
file.save(file_path)
return unique_name
八、会话安全
8.1 使用安全的会话管理
from flask import Flask, session
import secrets
app = Flask(__name__)
app.secret_key = secrets.token_hex(32) # 生成强随机密钥
# 配置会话
app.config.update(
SESSION_COOKIE_SECURE=True, # 只通过HTTPS传输
SESSION_COOKIE_HTTPONLY=True, # 防止JavaScript访问
SESSION_COOKIE_SAMESITE='Lax', # CSRF防护
PERMANENT_SESSION_LIFETIME=1800 # 30分钟超时
)
8.2 CSRF防护
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
csrf = CSRFProtect(app)
# 在表单中包含CSRF令牌
#
九、API安全
9.1 速率限制
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
return {'data': 'value'}
9.2 API密钥验证
from functools import wraps
from flask import request, jsonify
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': '缺少API密钥'}), 401
if not validate_api_key(api_key):
return jsonify({'error': '无效的API密钥'}), 403
return f(*args, **kwargs)
return decorated_function
@app.route('/api/protected')
@require_api_key
def protected_endpoint():
return {'data': 'protected'}
十、依赖安全
10.1 检查依赖漏洞
# 安装safety
pip install safety
# 检查已安装的包
safety check
# 检查requirements.txt
safety check -r requirements.txt
10.2 固定依赖版本
# requirements.txt
# 不安全:使用>=可能引入有漏洞的新版本
requests>=2.0.0
# 安全:固定版本
requests==2.28.1
# 或使用pip freeze
pip freeze > requirements.txt
十一、日志安全
11.1 不要记录敏感信息
import logging
# 不安全
logging.info(f"用户登录: {username}, 密码: {password}")
# 安全
logging.info(f"用户登录: {username}")
11.2 过滤敏感数据
class SensitiveDataFilter(logging.Filter):
def filter(self, record):
# 过滤密码、令牌等敏感信息
message = record.getMessage()
message = re.sub(r'password=\S+', 'password=***', message)
message = re.sub(r'token=\S+', 'token=***', message)
record.msg = message
return True
logger = logging.getLogger()
logger.addFilter(SensitiveDataFilter())
十二、安全配置
12.1 生产环境配置
# config.py
class ProductionConfig:
DEBUG = False
TESTING = False
SECRET_KEY = os.getenv('SECRET_KEY')
# 数据库
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL')
SQLALCHEMY_TRACK_MODIFICATIONS = False
# 会话
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# HTTPS
PREFERRED_URL_SCHEME = 'https'
十三、安全测试
13.1 使用bandit扫描
# 安装
pip install bandit
# 扫描代码
bandit -r myproject/
# 生成报告
bandit -r myproject/ -f html -o report.html
13.2 编写安全测试
import pytest
def test_sql_injection():
"""测试SQL注入防护"""
malicious_input = "'; DROP TABLE users; --"
with pytest.raises(ValueError):
get_user(malicious_input)
def test_xss_protection():
"""测试XSS防护"""
malicious_input = ""
output = display_user_input(malicious_input)
assert '
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)