什么是JWT?
是一种用于在网络应用程序之间安全传输信息的紧凑且独立的令牌格式。JWT通常由三个部分组成,分别是:
Header(头部): 包含令牌的类型(通常是JWT)和使用的签名算法(如HMAC SHA256)。
Payload(有效载荷): 包含声明(claims),这是需要传输的信息。声明可以是关于用户的,也可以是关于令牌本身的。
Signature(签名): 由头部、有效载荷和一个密钥生成,用于验证令牌的真实性。
header.payload.signature
Python 示例:校验JWT
以下是一个简单的Python示例,用于生成和校验JWT:
python复制代码import jwt
import datetime
# 定义密钥和算法
SECRET_KEY = 'your-256-bit-secret'
ALGORITHM = 'HS256'
# 生成JWT
def create_jwt(data, secret_key):
payload = {
'data': data,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30) # 令牌过期时间
}
token = jwt.encode(payload, secret_key, algorithm=ALGORITHM)
return token
# 校验JWT
def verify_jwt(token, secret_key):
try:
# 校验和解码JWT
decoded_data = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
return decoded_data
except jwt.ExpiredSignatureError:
return 'Token has expired'
except jwt.InvalidTokenError:
return 'Invalid token'
# 测试
if __name__ == '__main__':
user_data = {'username': 'test_user'}
# 创建JWT
token = create_jwt(user_data, SECRET_KEY)
print(f'Generated Token: {token}')
# 校验JWT
decoded_data = verify_jwt(token, SECRET_KEY)
if isinstance(decoded_data, dict):
print(f'Decoded Data: {decoded_data}')
else:
print(f'Error: {decoded_data}')
使用Python生成一个随机密钥
以下是如何使用Python生成一个安全密钥的例子:
python复制代码import secrets
# 生成一个32字节的随机密钥,并转换为十六进制表示
SECRET_KEY = secrets.token_hex(32)
print(f'Generated Secret Key: {SECRET_KEY}')
实际项目中如何管理密钥
存储在环境变量中:不要把密钥硬编码在代码里,而是将其存储在环境变量中,这样可以减少密钥泄露的风险。
import os SECRET_KEY = os.getenv('SECRET_KEY')
然后在你的服务器或开发环境中设置环境变量:
export SECRET_KEY='your-256-bit-secret'
使用密钥管理服务:在更复杂的应用中,可以使用云服务提供的密钥管理系统(如AWS KMS、GCP KMS)来管理和旋转密钥。
定期旋转密钥:定期更换密钥是一种良好的安全实践,尤其是在发现潜在的安全漏洞时。
密钥旋转
假设我们有一个系统使用SECRET_KEY
来加密JWT。在实现密钥旋转时,我们可以考虑以下步骤:
- 生成新密钥:在密钥旋转时,首先生成一个新密钥。
- 更新系统配置:将新密钥添加到系统配置中,确保新生成的JWT使用新密钥进行签名。
- 支持旧密钥解密:确保在一段时间内,系统能够使用旧密钥解密JWT,以支持那些在旋转之前生成的JWT。
- 逐步淘汰旧密钥:当系统中的所有JWT都使用新密钥时,可以安全地移除旧密钥。
自动化密钥旋转脚本示例
python复制代码import os
import secrets
import json
import time
# 配置文件路径(假设你使用JSON文件来存储密钥信息)
CONFIG_FILE = 'config.json'
def load_config():
"""加载配置文件"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
return {}
def save_config(config):
"""保存配置文件"""
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
def generate_new_key():
"""生成新的随机密钥"""
return secrets.token_hex(32)
def rotate_keys():
"""旋转密钥的主逻辑"""
config = load_config()
# 获取当前密钥版本
current_version = config.get('current_version', 0)
# 生成新密钥
new_key = generate_new_key()
# 更新密钥信息
new_version = current_version + 1
config[f'secret_key_v{new_version}'] = new_key
config['current_version'] = new_version
# 选择保留的密钥版本数量(例如,保留当前版本和前一个版本)
max_versions_to_keep = 2
for version in range(new_version - max_versions_to_keep):
config.pop(f'secret_key_v{version}', None)
# 保存更新后的配置
save_config(config)
print(f"Keys rotated. New key version: {new_version}")
def get_current_key():
"""获取当前使用的密钥"""
config = load_config()
current_version = config.get('current_version')
return config.get(f'secret_key_v{current_version}')
def decrypt_with_old_keys(token):
"""尝试使用旧密钥解密数据"""
config = load_config()
current_version = config.get('current_version')
# 遍历旧密钥进行解密尝试
for version in range(current_version, -1, -1):
key = config.get(f'secret_key_v{version}')
if key:
# 在这里添加你的解密逻辑
try:
# 假设使用jwt解码
# decoded_data = jwt.decode(token, key, algorithms=['HS256'])
# return decoded_data
print(f"Token successfully decrypted with key version {version}.")
return True
except Exception:
continue
print("Failed to decrypt token with available keys.")
return False
# 主函数
if __name__ == "__main__":
# 旋转密钥(假设每隔一段时间旋转一次)
rotate_keys()
# 获取当前密钥(用来加密新数据)
current_key = get_current_key()
print(f"Current Key: {current_key}")
# 示例:假设有一个旧的JWT令牌,需要使用旧密钥解密
# token = 'your.jwt.token'
# decrypt_with_old_keys(token)
# 模拟定期旋转
# time.sleep(3600) # 每隔一小时旋转一次(根据需要调整)
脚本说明
- 配置文件:
- 该脚本使用一个JSON配置文件(
config.json
)来存储不同版本的密钥。 - 每次旋转密钥时,脚本会生成一个新密钥,并将其存储为一个新的版本,同时移除最早的密钥版本(根据保留版本数量)。
- 该脚本使用一个JSON配置文件(
- 密钥旋转:
rotate_keys
函数负责生成新密钥并更新配置文件。它会增加密钥版本号,并移除超出保留范围的旧版本密钥。
- 获取当前密钥:
get_current_key
函数用于获取当前版本的密钥,通常用于加密新的数据。
- 使用旧密钥解密:
decrypt_with_old_keys
函数示例展示了如何使用当前或旧的密钥尝试解密数据。在实际应用中,你可以将JWT解密逻辑替换到该函数中。
- 定期旋转:
- 你可以使用操作系统的定时任务(如Linux的Cron作业或Windows任务计划程序)来定期运行这个脚本,或者在脚本中用
time.sleep
来模拟定期旋转。
- 你可以使用操作系统的定时任务(如Linux的Cron作业或Windows任务计划程序)来定期运行这个脚本,或者在脚本中用
实际应用中的改进
- 密钥管理服务(KMS):在实际生产环境中,建议使用云服务提供的密钥管理系统来处理密钥旋转,以提高安全性和可管理性。
- 通知和日志记录:为密钥旋转操作添加通知和日志记录功能,以便在出现问题时可以快速响应。
- 安全性措施:确保配置文件(
config.json
)的访问权限受控,以防止未经授权的访问或修改。
[!IMPORTANT]
flask+JWT
from flask import Flask, request, jsonify, make_response
import jwt
import datetime
from functools import wraps
import secrets
app = Flask(__name__)
# 生成随机密钥
SECRET_KEY = secrets.token_hex(32)
app.config['SECRET_KEY'] = SECRET_KEY
# 模拟的用户数据(在实际应用中应从数据库获取)
users = {
'testuser': 'password123'
}
# 生成JWT装饰器,用于保护路由
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('x-access-token')
if not token:
return jsonify({'message': 'Token is missing!'}), 403
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
current_user = data['username']
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token has expired!'}), 403
except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token!'}), 403
return f(current_user, *args, **kwargs)
return decorated
# 用户登录并生成JWT
@app.route('/login', methods=['POST'])
def login():
auth = request.authorization
if not auth or not auth.username or not auth.password:
return make_response('Could not verify', 401, {'WWW-Authenticate': 'Basic realm="Login required!"'})
user = users.get(auth.username)
if user and user == auth.password:
token = jwt.encode({
'username': auth.username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
return make_response('Could not verify', 401, {'WWW-Authenticate': 'Basic realm="Login required!"'})
# 受保护的路由
@app.route('/protected', methods=['GET'])
@token_required
def protected_route(current_user):
return jsonify({'message': f'Hello, {current_user}! You have access to this protected route.'})
# 获取新密钥
@app.route('/rotate_key', methods=['POST'])
def rotate_key():
global SECRET_KEY
SECRET_KEY = secrets.token_hex(32)
app.config['SECRET_KEY'] = SECRET_KEY
return jsonify({'message': 'Secret key has been rotated successfully!'})
if __name__ == '__main__':
app.run(debug=True)