-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathdependencies.py
More file actions
132 lines (102 loc) · 3.82 KB
/
dependencies.py
File metadata and controls
132 lines (102 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# @Time : 2023/8/15 17:43
# @Author : Lan
# @File : depends.py
# @Software: PyCharm
from fastapi import Header, HTTPException, Depends
from fastapi.requests import Request
import base64
import hmac
import json
import time
from core.settings import settings
from apps.admin.services import FileService, ConfigService, LocalFileService
def create_token(data: dict, expires_in: int = 3600 * 24 * 30) -> str:
"""
创建JWT token
:param data: 数据负载
:param expires_in: 过期时间(秒)
"""
header = base64.b64encode(
json.dumps({"alg": "HS256", "typ": "JWT"}).encode()
).decode()
payload = base64.b64encode(
json.dumps({**data, "exp": int(time.time()) + expires_in}).encode()
).decode()
signature = hmac.new(
settings.admin_token.encode(), f"{header}.{payload}".encode(), "sha256"
).digest()
signature = base64.b64encode(signature).decode()
return f"{header}.{payload}.{signature}"
def verify_token(token: str) -> dict:
"""
验证JWT token
:param token: JWT token
:return: 解码后的数据
"""
try:
header_b64, payload_b64, signature_b64 = token.split(".")
# 验证签名
expected_signature = hmac.new(
settings.admin_token.encode(),
f"{header_b64}.{payload_b64}".encode(),
"sha256",
).digest()
expected_signature_b64 = base64.b64encode(expected_signature).decode()
if not hmac.compare_digest(signature_b64, expected_signature_b64):
raise ValueError("无效的签名")
# 解码payload
payload = json.loads(base64.b64decode(payload_b64))
# 检查是否过期
if payload.get("exp", 0) < time.time():
raise ValueError("token已过期")
return payload
except Exception as e:
raise ValueError(f"token验证失败: {str(e)}")
def _extract_bearer_token(authorization: str) -> str:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未授权或授权校验失败")
token = authorization.split(" ", 1)[1].strip()
if not token:
raise HTTPException(status_code=401, detail="未授权或授权校验失败")
return token
def _require_admin_payload(authorization: str) -> dict:
token = _extract_bearer_token(authorization)
try:
payload = verify_token(token)
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
if not payload.get("is_admin", False):
raise HTTPException(status_code=401, detail="未授权或授权校验失败")
return payload
ADMIN_PUBLIC_ENDPOINTS = {("POST", "/admin/login")}
async def admin_required(
authorization: str = Header(default=None), request: Request = None
):
"""
验证管理员权限
"""
if request and (request.method, request.url.path) in ADMIN_PUBLIC_ENDPOINTS:
return None
return _require_admin_payload(authorization)
async def share_required_login(authorization: str = Header(default=None)):
"""
验证分享上传权限
当settings.openUpload为False时,要求用户必须登录并具有管理员权限
当settings.openUpload为True时,允许游客上传
:param authorization: 认证头信息
:param request: 请求对象
:return: 验证结果
"""
if not settings.openUpload:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=403, detail="本站未开启游客上传,如需上传请先登录后台"
)
_require_admin_payload(authorization)
return True
async def get_file_service():
return FileService()
async def get_config_service():
return ConfigService()
async def get_local_file_service():
return LocalFileService()