Python安全编程实践

一、输入验证

1.1 永远不要信任用户输入

# 不安全
def get_user_age(age_str):
return int(age_str) # 可能抛出异常

# 安全
def get_user_age(age_str):
try:
age = int(age_str)
if not 0 <= age <= 150:
raise ValueError("年龄超出合理范围")
return age
except ValueError:
raise ValueError("无效的年龄格式")

1.2 使用白名单而不是黑名单

# 不安全:黑名单
def validate_username(username):
forbidden = ['admin', 'root', 'system']
if username in forbidden:
raise ValueError("用户名不允许")
return username

# 安全:白名单
import re

def validate_username(username):
# 只允许字母、数字和下划线,3-20个字符
if not re.match(r'^[a-zA-Z0-9_]{3,20}$', username):
raise ValueError("用户名格式不正确")
return username

二、SQL注入防护

2.1 使用参数化查询

import sqlite3

# 不安全:字符串拼接
def get_user_unsafe(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query) # SQL注入风险!
return cursor.fetchone()

# 安全:参数化查询
def get_user_safe(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE username = ?"
cursor.execute(query, (username,))
return cursor.fetchone()

2.2 使用ORM

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)

# ORM自动处理SQL注入
def get_user(session, username):
return session.query(User).filter(User.username == username).first()

三、命令注入防护

3.1 避免使用shell=True

import subprocess

# 不安全
def ping_host_unsafe(host):
command = f"ping -c 1 {host}"
subprocess.run(command, shell=True) # 命令注入风险!

# 安全
def ping_host_safe(host):
# 验证输入
import re
if not re.match(r'^[a-zA-Z0-9.-]+$', host):
raise ValueError("无效的主机名")

# 使用列表形式,不使用shell
subprocess.run(['ping', '-c', '1', host], shell=False)

3.2 使用shlex转义

import shlex
import subprocess

def run_command_safe(user_input):
# 转义用户输入
safe_input = shlex.quote(user_input)
command = f"echo {safe_input}"
subprocess.run(command, shell=True)

四、XSS防护

4.1 转义HTML输出

import html

def display_user_input(user_input):
# 不安全
# return f"

{user_input}

"

# 安全:转义HTML
safe_input = html.escape(user_input)
return f"

{safe_input}

"

4.2 使用模板引擎

from jinja2 import Template

# Jinja2自动转义
template = Template("

{{ user_input }}

")
output = template.render(user_input="")
# 输出:

<script>alert('XSS')</script>


五、密码安全

5.1 使用bcrypt哈希密码

# 安装: pip install bcrypt

import bcrypt

def hash_password(password):
# 生成盐并哈希密码
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed

def verify_password(password, hashed):
# 验证密码
return bcrypt.checkpw(password.encode('utf-8'), hashed)

# 使用
password = "my_secure_password"
hashed = hash_password(password)

# 验证
if verify_password("my_secure_password", hashed):
print("密码正确")

5.2 密码强度验证

import re

def validate_password_strength(password):
"""
密码必须:
- 至少8个字符
- 包含大写字母
- 包含小写字母
- 包含数字
- 包含特殊字符
"""
if len(password) < 8:
return False, "密码至少8个字符"

if not re.search(r'[A-Z]', password):
return False, "密码必须包含大写字母"

if not re.search(r'[a-z]', password):
return False, "密码必须包含小写字母"

if not re.search(r'\d', password):
return False, "密码必须包含数字"

if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False, "密码必须包含特殊字符"

return True, "密码强度足够"

六、敏感数据保护

6.1 不要在代码中硬编码密钥

# 不安全
API_KEY = "sk-1234567890abcdef"

# 安全:使用环境变量
import os

API_KEY = os.getenv('API_KEY')
if not API_KEY:
raise ValueError("API_KEY环境变量未设置")

6.2 加密敏感数据

# 安装: pip install cryptography

from cryptography.fernet import Fernet

# 生成密钥(只做一次,保存到安全位置)
key = Fernet.generate_key()
cipher = Fernet(key)

# 加密
def encrypt_data(data):
return cipher.encrypt(data.encode())

# 解密
def decrypt_data(encrypted_data):
return cipher.decrypt(encrypted_data).decode()

# 使用
sensitive_data = "信用卡号: 1234-5678-9012-3456"
encrypted = encrypt_data(sensitive_data)
decrypted = decrypt_data(encrypted)

七、文件操作安全

7.1 路径遍历防护

import os
from pathlib import Path

def read_user_file_unsafe(filename):
# 不安全:可能读取任意文件
with open(filename) as f:
return f.read()

def read_user_file_safe(filename, base_dir='/var/app/uploads'):
# 安全:验证路径
base_path = Path(base_dir).resolve()
file_path = (base_path / filename).resolve()

# 确保文件在允许的目录内
if not str(file_path).startswith(str(base_path)):
raise ValueError("非法的文件路径")

with open(file_path) as f:
return f.read()

7.2 安全的文件上传

import os
import uuid
from werkzeug.utils import secure_filename

def save_uploaded_file(file, upload_dir='/var/app/uploads'):
# 验证文件类型
allowed_extensions = {'.jpg', '.png', '.pdf'}
ext = os.path.splitext(file.filename)[1].lower()

if ext not in allowed_extensions:
raise ValueError("不允许的文件类型")

# 生成安全的文件名
safe_name = secure_filename(file.filename)
unique_name = f"{uuid.uuid4()}{ext}"

# 保存文件
file_path = os.path.join(upload_dir, unique_name)
file.save(file_path)

return unique_name

八、会话安全

8.1 使用安全的会话管理

from flask import Flask, session
import secrets

app = Flask(__name__)
app.secret_key = secrets.token_hex(32) # 生成强随机密钥

# 配置会话
app.config.update(
SESSION_COOKIE_SECURE=True, # 只通过HTTPS传输
SESSION_COOKIE_HTTPONLY=True, # 防止JavaScript访问
SESSION_COOKIE_SAMESITE='Lax', # CSRF防护
PERMANENT_SESSION_LIFETIME=1800 # 30分钟超时
)

8.2 CSRF防护

from flask_wtf.csrf import CSRFProtect

app = Flask(__name__)
csrf = CSRFProtect(app)

# 在表单中包含CSRF令牌
#

九、API安全

9.1 速率限制

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

app = Flask(__name__)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)

@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
return {'data': 'value'}

9.2 API密钥验证

from functools import wraps
from flask import request, jsonify

def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')

if not api_key:
return jsonify({'error': '缺少API密钥'}), 401

if not validate_api_key(api_key):
return jsonify({'error': '无效的API密钥'}), 403

return f(*args, **kwargs)
return decorated_function

@app.route('/api/protected')
@require_api_key
def protected_endpoint():
return {'data': 'protected'}

十、依赖安全

10.1 检查依赖漏洞

# 安装safety
pip install safety

# 检查已安装的包
safety check

# 检查requirements.txt
safety check -r requirements.txt

10.2 固定依赖版本

# requirements.txt
# 不安全:使用>=可能引入有漏洞的新版本
requests>=2.0.0

# 安全:固定版本
requests==2.28.1

# 或使用pip freeze
pip freeze > requirements.txt

十一、日志安全

11.1 不要记录敏感信息

import logging

# 不安全
logging.info(f"用户登录: {username}, 密码: {password}")

# 安全
logging.info(f"用户登录: {username}")

11.2 过滤敏感数据

class SensitiveDataFilter(logging.Filter):
def filter(self, record):
# 过滤密码、令牌等敏感信息
message = record.getMessage()
message = re.sub(r'password=\S+', 'password=***', message)
message = re.sub(r'token=\S+', 'token=***', message)
record.msg = message
return True

logger = logging.getLogger()
logger.addFilter(SensitiveDataFilter())

十二、安全配置

12.1 生产环境配置

# config.py
class ProductionConfig:
DEBUG = False
TESTING = False
SECRET_KEY = os.getenv('SECRET_KEY')

# 数据库
SQLALCHEMY_DATABASE_URI = os.getenv('DATABASE_URL')
SQLALCHEMY_TRACK_MODIFICATIONS = False

# 会话
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'

# HTTPS
PREFERRED_URL_SCHEME = 'https'

十三、安全测试

13.1 使用bandit扫描

# 安装
pip install bandit

# 扫描代码
bandit -r myproject/

# 生成报告
bandit -r myproject/ -f html -o report.html

13.2 编写安全测试

import pytest

def test_sql_injection():
"""测试SQL注入防护"""
malicious_input = "'; DROP TABLE users; --"
with pytest.raises(ValueError):
get_user(malicious_input)

def test_xss_protection():
"""测试XSS防护"""
malicious_input = ""
output = display_user_input(malicious_input)
assert '

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐