基于 OpenClaw + Claude Code 的端到端研发自动化系统
本认证模块是"基于 OpenClaw + Claude Code 的端到端研发自动化系统"的核心组件,为整个研发流程提供统一、安全、可扩展的用户身份认证与授权服务。 系统采用业界标准的 OAuth2.0 协议和 JWT(JSON Web Token)技术,支持多种认证场景,包括用户名密码登录、第三方 OAuth 登录、客户端凭证认证等。
Access Token + Refresh Token 双令牌设计,Access Token 短期有效(默认 60 分钟),Refresh Token 长期有效(默认 7 天),支持无感刷新
完整实现 Authorization Code Flow,支持 PKCE 增强安全,适用于前后端分离应用和移动端应用
每次刷新令牌时自动轮换,旧令牌立即失效;支持主动登出撤销所有令牌,防止令牌泄露风险
基于角色的访问控制(Role-Based Access Control),支持用户角色分配和细粒度权限管理
记录所有认证相关操作(登录、登出、令牌刷新、密码修改等),支持安全审计和异常检测
预留 GitHub、GitLab、Google、Microsoft 等第三方 OAuth 登录接口,支持快速集成
支持用户注册后邮箱验证、忘记密码时的安全重置流程,包含防暴力破解机制
内置请求速率限制,防止暴力破解和 DDoS 攻击,保护认证端点安全
auth_module/
├── __init__.py # 包初始化
├── main.py # FastAPI 应用入口
├── models.py # SQLAlchemy 数据模型
├── schemas.py # Pydantic 数据验证模式
├── security.py # 安全工具(JWT、密码哈希)
├── crud.py # 数据库 CRUD 操作
├── database.py # 数据库连接配置
├── routes_auth.py # 认证相关 API 路由
├── routes_oauth2.py # OAuth2 相关 API 路由
├── tests/
│ ├── __init__.py
│ └── test_auth.py # 单元测试套件
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 镜像构建
├── docker-compose.yml # Docker Compose 配置
└── .env.example # 环境变量模板
| 安全层 | 实现方式 | 防护目标 |
|---|---|---|
| 传输层 | HTTPS/TLS 1.3 | 防止中间人攻击、数据窃听 |
| 密码存储 | bcrypt 哈希(cost=12) | 防止彩虹表攻击、数据库泄露后密码暴露 |
| 令牌安全 | JWT HS256 签名 + 过期时间 | 防止令牌伪造、重放攻击 |
| CSRF 防护 | State 参数验证 | 防止跨站请求伪造 |
| PKCE | Code Challenge (S256) | 防止授权码拦截攻击 |
| 速率限制 | 滑动窗口限流 | 防止暴力破解、DDoS 攻击 |
| 审计日志 | 全量操作记录 | 安全事件追溯、异常检测 |
| 令牌轮换 | Refresh Token 一次性使用 | 降低令牌泄露风险 |
# Access Token Payload 示例
{
"sub": "user-uuid-12345", # 用户唯一标识
"username": "john.doe", # 用户名
"roles": ["developer"], # 用户角色列表
"department": "Engineering", # 所属部门
"exp": 1710432000, # 过期时间戳
"iat": 1710428400, # 签发时间戳
"type": "access", # 令牌类型
"jti": "unique-token-id" # 令牌唯一 ID
}
| 方法 | 路径 | 描述 | 认证 |
|---|---|---|---|
| POST | /api/v1/auth/register |
用户注册 | ❌ |
| POST | /api/v1/auth/token |
登录获取令牌(OAuth2 Password Grant) | ❌ |
| POST | /api/v1/auth/refresh |
刷新访问令牌 | ❌ |
| POST | /api/v1/auth/logout |
用户登出(撤销令牌) | ✅ |
| GET | /api/v1/auth/me |
获取当前用户信息 | ✅ |
| PUT | /api/v1/auth/me |
更新用户信息 | ✅ |
| POST | /api/v1/auth/change-password |
修改密码 | ✅ |
| GET | /api/v1/auth/audit-logs |
获取审计日志 | ✅ |
| 方法 | 路径 | 描述 | 认证 |
|---|---|---|---|
| GET | /api/v1/oauth2/authorize |
OAuth2 授权端点(获取授权码) | ✅ |
| POST | /api/v1/oauth2/token |
OAuth2 令牌端点(授权码/刷新令牌/客户端凭证) | ❌ |
| POST | /api/v1/oauth2/clients |
创建 OAuth2 客户端 | ✅ Admin |
| GET | /api/v1/oauth2/clients |
获取客户端列表 | ✅ Admin |
curl -X POST http://localhost:8000/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{
"username": "newuser",
"email": "user@example.com",
"password": "SecurePass@123",
"full_name": "New User",
"department": "Engineering"
}'
curl -X POST http://localhost:8000/api/v1/auth/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=testuser&password=Test@123456"
# 响应示例
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 3600
}
curl -X GET http://localhost:8000/api/v1/auth/me \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIs..."
| 表名 | 描述 | 关键字段 |
|---|---|---|
users |
用户基本信息表 | id, user_uuid, username, email, hashed_password, role, is_active |
refresh_tokens |
刷新令牌记录表 | id, token_hash, user_id, expires_at, is_revoked |
oauth_accounts |
第三方 OAuth 账户关联表 | provider, provider_user_id, access_token, refresh_token |
auth_codes |
OAuth2 授权码临时存储 | code_uuid, user_id, client_id, redirect_uri, expires_at |
oauth_clients |
OAuth2 客户端注册信息 | client_id, client_secret, redirect_uris, grant_types |
audit_logs |
审计日志表 | action, status, user_id, ip_address, details, created_at |
roles |
角色定义表(RBAC) | role_name, permissions |
permissions |
权限定义表 | permission_name, resource, action |
┌─────────────┐ ┌──────────────────┐ ┌──────────────┐
│ users │◄──────│ refresh_tokens │ │ oauth_clients│
├─────────────┤ ├──────────────────┤ ├──────────────┤
│ id │ │ id │ │ id │
│ user_uuid │ │ token_hash │ │ client_id │
│ username │ │ user_id (FK) │ │ client_secret│
│ email │ │ expires_at │ │ redirect_uris│
│ role │ │ is_revoked │ └──────────────┘
└──────┬──────┘ └──────────────────┘
│
│ 1:N
├──────────────────────────────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌──────────────┐
│oauth_accounts│ │ auth_codes │
├─────────────┤ ├──────────────┤
│ provider │ │ code_uuid │
│ provider_id │ │ user_id (FK) │
│ access_token│ │ client_id │
└─────────────┘ └──────────────┘
▲
│
▼
┌─────────────┐
│ audit_logs │
├─────────────┤
│ action │
│ user_id(FK) │
│ created_at │
└─────────────┘
# 开发环境启动
docker-compose up -d postgres redis
docker-compose --profile dev up -d
# 生产环境启动
docker-compose --profile production up -d
# 查看日志
docker-compose logs -f auth-service
# 运行测试
docker-compose exec auth-service pytest
# k8s-deployment.yaml 核心配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: auth-module
spec:
replicas: 3
selector:
matchLabels:
app: auth-module
template:
spec:
containers:
- name: auth-service
image: registry.example.com/auth-module:1.0.0
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: auth-secrets
key: database-url
- name: JWT_SECRET_KEY
valueFrom:
secretKeyRef:
name: auth-secrets
key: jwt-secret
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
resources:
requests:
memory: "128Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
| 变量名 | 默认值 | 描述 |
|---|---|---|
DATABASE_URL |
sqlite:///./auth_db.sqlite3 | 数据库连接字符串 |
JWT_SECRET_KEY |
(必须自定义) | JWT 签名密钥(生产环境必须修改) |
ACCESS_TOKEN_EXPIRE_MINUTES |
60 | Access Token 有效期(分钟) |
REFRESH_TOKEN_EXPIRE_DAYS |
7 | Refresh Token 有效期(天) |
LOG_LEVEL |
info | 日志级别(debug/info/warning/error) |
APP_ENV |
development | 应用环境(development/production) |
def test_login_success(client, test_user):
"""测试成功登录"""
response = client.post(
"/api/v1/auth/token",
data={
"username": "testuser",
"password": "Test@123456"
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
def test_refresh_token_success(client, test_user):
"""测试成功刷新令牌"""
# 先登录获取令牌
login_response = client.post(
"/api/v1/auth/token",
data={"username": "testuser", "password": "Test@123456"}
)
refresh_token = login_response.json()["refresh_token"]
# 刷新令牌
refresh_response = client.post(
"/api/v1/auth/refresh",
json={"refresh_token": refresh_token}
)
assert refresh_response.status_code == 200
assert "access_token" in refresh_response.json()
# 运行所有测试
pytest auth_module/tests/ -v
# 运行测试并生成覆盖率报告
pytest auth_module/tests/ --cov=auth_module --cov-report=html
# 运行特定测试类
pytest auth_module/tests/test_auth.py::TestLogin -v
# 运行测试并输出详细日志
pytest auth_module/tests/ -v --tb=long -s
import { createContext, useContext, useState } from 'react';
const AuthContext = createContext();
export function AuthProvider({ children }) {
const [token, setToken] = useState(localStorage.getItem('access_token'));
const login = async (username, password) => {
const response = await fetch('/api/v1/auth/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({ username, password })
});
const data = await response.json();
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
setToken(data.access_token);
};
const refreshToken = async () => {
const refresh_token = localStorage.getItem('refresh_token');
const response = await fetch('/api/v1/auth/refresh', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh_token })
});
const data = await response.json();
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('refresh_token', data.refresh_token);
setToken(data.access_token);
};
return (
{children}
);
}