基于 OpenClaw + Claude Code 的端到端研发自动化系统 · AI Coding Agent 核心规范文档
本文档定义了基于 OpenClaw + Claude Code 的端到端研发自动化系统中,AI Coding Agent 生成后端代码时必须遵循的标准模板和规范约束规则。通过统一的代码规范和模板,确保生成的代码具备高质量、可维护性、安全性和一致性。
系统采用经典的分层架构,各层职责清晰,依赖关系单向,确保代码的可维护性和可测试性。
上层可以依赖下层,下层不能依赖上层。Controller → Service → Repository → Entity
Controller 不能直接调用 Repository,必须通过 Service 层。特殊情况需经过架构评审。
Service 层依赖抽象接口而非具体实现,便于单元测试和 Mock。
统一的项目结构有助于 AI 理解和生成代码,也便于开发人员快速定位和维护。
Controller(API Layer)负责处理 HTTP 请求,进行参数验证、认证授权,并调用 Service 层执行业务逻辑。
# src/api/v1/users.py from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List, Optional from ...core.database import get_db from ...core.security import get_current_user from ...services.user_service import UserService from ...schemas.user_schema import ( UserCreate, UserUpdate, UserResponse, UserListResponse ) from ...models.user import User router = APIRouter(prefix="/users", tags=["用户管理"]) @router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def create_user( user_data: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ) -> UserResponse: """ 创建新用户 - **username**: 用户名 (3-20 字符,字母数字下划线) - **email**: 邮箱地址 (必须符合邮箱格式) - **password**: 密码 (最少 8 位,包含大小写字母和数字) - **role**: 用户角色 (可选:user, admin, super_admin) 返回创建成功的用户信息(不包含密码) """ service = UserService(db) # 检查用户名是否已存在 existing_user = service.get_by_username(user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"code": "USER_ALREADY_EXISTS", "message": "用户名已存在"} ) # 创建用户 created_user = service.create(user_data) return created_user @router.get("/{user_id}", response_model=UserResponse) async def get_user( user_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ) -> UserResponse: """获取用户详情""" service = UserService(db) user = service.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "USER_NOT_FOUND", "message": "用户不存在"} ) return user @router.get("", response_model=UserListResponse) async def list_users( skip: int = 0, limit: int = 20, keyword: Optional[str] = None, role: Optional[str] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ) -> UserListResponse: """ 获取用户列表(支持分页、搜索、过滤) - **skip**: 跳过记录数 (默认 0) - **limit**: 每页数量 (默认 20,最大 100) - **keyword**: 搜索关键词 (匹配用户名或邮箱) - **role**: 按角色过滤 """ if limit > 100: limit = 100 service = UserService(db) users, total = service.list(skip=skip, limit=limit, keyword=keyword, role=role) return UserListResponse( items=users, total=total, skip=skip, limit=limit ) @router.put("/{user_id}", response_model=UserResponse) async def update_user( user_id: str, user_data: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ) -> UserResponse: """更新用户信息""" service = UserService(db) updated_user = service.update(user_id, user_data) if not updated_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "USER_NOT_FOUND", "message": "用户不存在"} ) return updated_user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """删除用户""" service = UserService(db) deleted = service.delete(user_id) if not deleted: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail={"code": "USER_NOT_FOUND", "message": "用户不存在"} )
Controller 只负责请求处理、参数验证、认证授权,不包含业务逻辑。业务逻辑必须委托给 Service 层。
所有 Controller 方法必须使用 async def 定义,支持高并发场景。
每个接口必须指定 response_model,确保返回数据结构一致,便于前端理解和自动化测试。
成功返回 200/201,创建资源返回 201,删除返回 204,错误使用对应 HTTP 状态码。
每个接口必须有详细的 docstring,描述功能、参数说明、返回值、可能的错误。
Service 层是业务逻辑的核心,负责处理复杂的业务流程、事务管理、跨模块协调等。
# src/services/user_service.py from sqlalchemy.orm import Session from sqlalchemy import or_, and_ from typing import List, Tuple, Optional from ..models.user import User from ..repositories.user_repository import UserRepository from ..schemas.user_schema import UserCreate, UserUpdate, UserResponse from ..core.security import hash_password, verify_password from ..core.exceptions import BusinessException class UserService: """ 用户业务逻辑服务类 负责用户相关的核心业务逻辑,包括: - 用户创建、更新、删除 - 用户认证和授权 - 用户信息查询和统计 """ def __init__(self, db: Session): """初始化服务,注入数据库会话和 Repository""" self.db = db self.repository = UserRepository(db) def create(self, user_data: UserCreate) -> UserResponse: """ 创建用户 Args: user_data: 用户创建数据 Returns: 创建成功的用户信息 Raises: BusinessException: 当用户名已存在时 """ # 密码加密 hashed_password = hash_password(user_data.password) # 创建用户实体 user = User( username=user_data.username, email=user_data.email.lower(), password_hash=hashed_password, role=user_data.role.value if hasattr(user_data.role, 'value') else user_data.role ) # 保存到数据库 created_user = self.repository.create(user) self.db.commit() self.db.refresh(created_user) return UserResponse.model_validate(created_user) def get_by_id(self, user_id: str) -> Optional[UserResponse]: """根据 ID 获取用户""" user = self.repository.find_by_id(user_id) if not user: return None return UserResponse.model_validate(user) def get_by_username(self, username: str) -> Optional[UserResponse]: """根据用户名获取用户""" user = self.repository.find_by_username(username) if not user: return None return UserResponse.model_validate(user) def list( self, skip: int = 0, limit: int = 20, keyword: Optional[str] = None, role: Optional[str] = None ) -> Tuple[List[UserResponse], int]: """ 获取用户列表 Args: skip: 跳过记录数 limit: 每页数量 keyword: 搜索关键词 role: 角色过滤 Returns: (用户列表,总记录数) """ # 构建查询条件 filters = [] if keyword: filters.append(or_( User.username.ilike(f"%{keyword}%"), User.email.ilike(f"%{keyword}%") )) if role: filters.append(User.role == role) # 执行查询 users, total = self.repository.list( filters=filters if filters else None, skip=skip, limit=limit ) return ( [UserResponse.model_validate(u) for u in users], total ) def update(self, user_id: str, user_data: UserUpdate) -> Optional[UserResponse]: """ 更新用户信息 Args: user_id: 用户 ID user_data: 更新数据 Returns: 更新后的用户信息,不存在返回 None """ user = self.repository.find_by_id(user_id) if not user: return None # 更新字段 update_data = user_data.model_dump(exclude_unset=True) if "password" in update_data: update_data["password_hash"] = hash_password(update_data.pop("password")) if "email" in update_data: update_data["email"] = update_data["email"].lower() updated_user = self.repository.update(user_id, update_data) self.db.commit() self.db.refresh(updated_user) return UserResponse.model_validate(updated_user) def delete(self, user_id: str) -> bool: """ 删除用户 Args: user_id: 用户 ID Returns: 删除成功返回 True,用户不存在返回 False """ deleted = self.repository.delete(user_id) if deleted: self.db.commit() return deleted def authenticate(self, username: str, password: str) -> Optional[UserResponse]: """ 用户认证 Args: username: 用户名 password: 明文密码 Returns: 认证成功返回用户信息,失败返回 None """ user = self.repository.find_by_username(username) if not user: return None if not verify_password(password, user.password_hash): return None return UserResponse.model_validate(user)
涉及多个数据库操作的業務邏輯,必須使用事務管理,確保數據一致性。使用 try-except-finally 確保異常時回滾。
Service 層不直接操作數據庫,必須通過 Repository 進行數據訪問,保持分層清晰。
業務異常拋出自定義 BusinessException,包含錯誤碼和詳細信息,便於前端處理。
Service 返回的必須是 DTO(Pydantic Schema),而不是直接返回 Entity,避免暴露內部結構。
Repository 层负责数据库 CRUD 操作,封装数据访问细节,提供简洁的接口给 Service 层调用。
# src/repositories/user_repository.py from sqlalchemy.orm import Session from sqlalchemy import select, func from typing import List, Tuple, Optional, Type from ..models.user import User from .base_repository import BaseRepository class UserRepository(BaseRepository[User]): """ 用户数据访问层 封装用户相关的数据库操作,提供 CRUD 和自定义查询方法 """ def __init__(self, db: Session): super().__init__(model=User, db=db) def find_by_username(self, username: str) -> Optional[User]: """根据用户名查找用户""" query = select(self.model).where(self.model.username == username) return self.db.execute(query).scalar_one_or_none() def find_by_email(self, email: str) -> Optional[User]: """根据邮箱查找用户""" query = select(self.model).where(self.model.email == email.lower()) return self.db.execute(query).scalar_one_or_none() def list_with_filters( self, filters: Optional[list] = None, skip: int = 0, limit: int = 20 ) -> Tuple[List[User], int]: """ 带条件过滤的用户列表查询 Args: filters: SQLAlchemy 过滤条件列表 skip: 跳过记录数 limit: 每页数量 Returns: (用户列表,总记录数) """ query = select(self.model) count_query = select(func.count(self.model.id)) if filters: query = query.where(*filters) count_query = count_query.where(*filters) # 获取总数 total = self.db.execute(count_query).scalar() # 分页查询 query = query.offset(skip).limit(limit).order_by(self.model.created_at.desc()) results = self.db.execute(query).scalars().all() return list(results), total # src/repositories/base_repository.py from sqlalchemy.orm import Session from sqlalchemy import select, delete from typing import Generic, TypeVar, Type, Optional, List from ..models.base import BaseModel T = TypeVar('T', bound=BaseModel) class BaseRepository(Generic[T]): """ 通用 Repository 基类 提供标准 CRUD 操作,子类继承后可直接使用 """ def __init__(self, model: Type[T], db: Session): self.model = model self.db = db def find_by_id(self, id: str) -> Optional[T]: """根据 ID 查找实体""" query = select(self.model).where(self.model.id == id) return self.db.execute(query).scalar_one_or_none() def create(self, entity: T) -> T: """创建新实体""" self.db.add(entity) self.db.flush() # 刷新以获取生成的 ID self.db.refresh(entity) return entity def update(self, id: str, update_data: dict) -> Optional[T]: """更新实体""" entity = self.find_by_id(id) if not entity: return None for field, value in update_data.items(): if hasattr(entity, field): setattr(entity, field, value) self.db.flush() self.db.refresh(entity) return entity def delete(self, id: str) -> bool: """删除实体""" entity = self.find_by_id(id) if not entity: return False self.db.delete(entity) return True def list( self, filters: Optional[list] = None, skip: int = 0, limit: int = 20 ) -> Tuple[List[T], int]: """通用列表查询""" from sqlalchemy import func query = select(self.model) count_query = select(func.count(self.model.id)) if filters: query = query.where(*filters) count_query = count_query.where(*filters) total = self.db.execute(count_query).scalar() results = self.db.execute( query.offset(skip).limit(limit).order_by(self.model.created_at.desc()) ).scalars().all() return list(results), total
所有 Repository 必须继承 BaseRepository,复用标准 CRUD 方法,只添加特定查询方法。
使用 select() 构造查询,不使用已废弃的 query() 方法。
Repository 返回的是 Entity 对象,不是 DTO。DTO 转换在 Service 层进行。
Repository 不负责 commit,由 Service 层控制事务边界。
# src/schemas/user_schema.py from pydantic import BaseModel, Field, EmailStr, validator from typing import Optional, List from datetime import datetime from enum import Enum class UserRole(Enum): """用户角色枚举""" USER = "user" ADMIN = "admin" SUPER_ADMIN = "super_admin" class UserBase(BaseModel): """用户基础 Schema""" username: str = Field( ..., min_length=3, max_length=20, pattern=r'^[a-zA-Z0-9_]+$', description="用户名 (3-20 字符,字母数字下划线)" ) email: EmailStr = Field(..., description="邮箱地址") role: UserRole = Field(default=UserRole.USER, description="用户角色") @validator('email') def validate_email_lowercase(cls, v): """邮箱转小写""" return v.lower() class UserCreate(UserBase): """用户创建请求 Schema""" password: str = Field( ..., min_length=8, max_length=128, description="密码 (最少 8 位)" ) @validator('password') def validate_password_strength(cls, v): """密码强度验证""" if not any(c.isupper() for c in v): raise ValueError("密码必须包含大写字母") if not any(c.islower() for c in v): raise ValueError("密码必须包含小写字母") if not any(c.isdigit() for c in v): raise ValueError("密码必须包含数字") return v class UserUpdate(BaseModel): """用户更新请求 Schema""" username: Optional[str] = Field(None, min_length=3, max_length=20) email: Optional[EmailStr] = None role: Optional[UserRole] = None password: Optional[str] = Field(None, min_length=8, max_length=128) class UserResponse(UserBase): """用户响应 Schema""" id: str created_at: datetime updated_at: Optional[datetime] = None class Config: from_attributes = True # 允许从 ORM 模型加载 class UserListResponse(BaseModel): """用户列表响应 Schema""" items: List[UserResponse] total: int skip: int limit: int
# src/models/user.py from sqlalchemy import Column, String, DateTime, Enum as SQLEnum from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.sql import func from ..models.base import BaseModel import uuid class User(BaseModel): """ 用户实体模型 对应数据库中的 users 表 """ __tablename__ = "users" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) username = Column(String(20), unique=True, index=True, nullable=False) email = Column(String(255), unique=True, index=True, nullable=False) password_hash = Column(String(255), nullable=False) role = Column(SQLEnum('user', 'admin', 'super_admin', name='user_role'), default='user') created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) def __repr__(self): return f"<User {self.username}>" def to_dict(self): """转换为字典(不包含敏感字段)""" return { "id": str(self.id), "username": self.username, "email": self.email, "role": self.role, "created_at": self.created_at.isoformat() if self.created_at else None }
| 类型 | 命名规则 | 示例 | 说明 |
|---|---|---|---|
| 变量/函数 | snake_case | user_name, get_user_by_id() |
小写字母 + 下划线分隔 |
| 类名 | PascalCase | UserService, UserRepository |
每个单词首字母大写 |
| 常量 | UPPER_SNAKE_CASE | MAX_PAGE_SIZE, DEFAULT_TIMEOUT |
全大写 + 下划线分隔 |
| 私有方法/变量 | _snake_case | _internal_cache, _validate_input() |
单下划线前缀表示私有 |
| URL 路径 | kebab-case | /api/v1/user-profiles |
小写字母 + 连字符分隔 |
| 数据库表名 | snake_case 复数 | users, order_items |
小写 + 下划线,使用复数形式 |
| 文件名 | snake_case | user_service.py, auth_middleware.py |
小写 + 下划线,见名知意 |
yonghu, yh_list)data, temp, a, b)userName 应改为 user_name)list, dict, str 作为变量名)密码必须使用 bcrypt 或 argon2 加密存储,禁止明文存储。密码长度至少 8 位,必须包含大小写字母和数字。
必须使用参数化查询或 ORM,禁止字符串拼接 SQL。所有用户输入必须经过验证和转义。
所有输出到前端的内容必须进行 HTML 转义。使用框架内置的转义函数,不手动拼接 HTML。
所有需要登录的接口必须添加 Depends(get_current_user)。敏感操作需要权限验证(如:管理员权限)。
日志中禁止打印密码、token、身份证号等敏感信息。返回给前端的数据要过滤掉敏感字段。
登录、注册、发送验证码等接口必须添加速率限制,防止暴力破解和 DDoS 攻击。
明确指定允许的源域名,不使用通配符 *。生产环境必须限制为实际使用的域名。
# src/api/middlewares/auth.py from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from ..core.config import settings from ..repositories.user_repository import UserRepository from ..core.database import get_db security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db = Depends(get_db) ): """ 获取当前登录用户 验证 JWT Token 有效性,从 Token 中提取用户 ID 并查询用户信息 """ try: token = credentials.credentials payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "INVALID_TOKEN", "message": "Token 无效"} ) repo = UserRepository(db) user = repo.find_by_id(user_id) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "USER_NOT_FOUND", "message": "用户不存在"} ) return user except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "TOKEN_EXPIRED", "message": "Token 已过期"} ) def require_role(required_role: str): """ 角色权限装饰器 用法:@require_role("admin") """ async def role_checker(current_user = Depends(get_current_user)): if current_user.role != required_role and current_user.role != "super_admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail={"code": "INSUFFICIENT_PERMISSION", "message": "权限不足"} ) return current_user return role_checker
所有查询条件字段、外键字段、唯一约束字段必须添加索引。避免全表扫描。
使用 joinedload 或 selectinload 预加载关联数据,避免 N+1 查询问题。
列表接口必须支持分页,单次查询最大限制 100 条记录。大数据量使用游标分页。
热点数据使用 Redis 缓存,设置合理的 TTL。缓存穿透使用布隆过滤器或空值缓存。
IO 密集型操作(数据库、HTTP 请求、文件读写)必须使用异步方式,提高并发能力。
批量插入/更新使用 bulk_insert_mappings() 或 execute(),避免循环单条操作。
数据库连接池大小根据并发量合理配置,默认 10-20,避免连接泄漏。
def create_user(self, user_data: UserCreate) -> UserResponse: """ 创建新用户 详细描述(可选): 此方法会验证用户名和邮箱的唯一性,对密码进行加密, 然后创建用户记录并返回用户信息(不包含密码)。 Args: user_data (UserCreate): 用户创建数据,包含用户名、邮箱、密码等 Returns: UserResponse: 创建成功的用户信息 Raises: BusinessException: 当用户名或邮箱已存在时抛出 Example: >>> user_data = UserCreate(username="john", email="john@example.com", password="Pass123") >>> user = service.create(user_data) >>> print(user.username) 'john' Note: - 密码会自动加密存储 - 邮箱会自动转换为小写 - 默认角色为普通用户 (USER) """
from pydantic import BaseModel, Field, validator, ValidationError import re class ProductCreate(BaseModel): name: str = Field(..., min_length=2, max_length=200) price: float = Field(..., gt=0, le=1000000) stock: int = Field(default=0, ge=0) category: str tags: List[str] = Field(default_factory=list) @validator('name') def validate_name(cls, v): if not re.match(r'^[\w\s\-\u4e00-\u9fa5]+$', v): raise ValueError("商品名称只能包含字母、数字、中文、空格和连字符") return v.strip() @validator('tags') def validate_tags(cls, v): if len(v) > 10: raise ValueError("标签数量不能超过 10 个") return [tag.strip() for tag in v if tag.strip()] @validator('price') def validate_price(cls, v): return round(v, 2) # 保留两位小数
# src/core/exceptions.py from fastapi import HTTPException, status class BaseAPIException(HTTPException): """API 异常基类""" def __init__(self, code: str, message: str, status_code: int = 400, details: dict = None): super().__init__( status_code=status_code, detail={"code": code, "message": message, "details": details or {}} ) class BusinessException(BaseAPIException): """业务逻辑异常""" def __init__(self, code: str, message: str, details: dict = None): super().__init__(code, message, status.HTTP_400_BAD_REQUEST, details) class NotFoundException(BaseAPIException): """资源不存在异常""" def __init__(self, resource_type: str, resource_id: str): super().__init__( code="RESOURCE_NOT_FOUND", message=f"{resource_type} 不存在", status_code=status.HTTP_404_NOT_FOUND, details={"resource_type": resource_type, "resource_id": resource_id} ) class UnauthorizedException(BaseAPIException): """未授权异常""" def __init__(self, message: str = "未授权访问"): super().__init__( code="UNAUTHORIZED", message=message, status_code=status.HTTP_401_UNAUTHORIZED ) class ForbiddenException(BaseAPIException): """禁止访问异常""" def __init__(self, message: str = "权限不足"): super().__init__( code="FORBIDDEN", message=message, status_code=status.HTTP_403_FORBIDDEN )
# src/main.py from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from .core.exceptions import BaseAPIException import logging logger = logging.getLogger(__name__) app = FastAPI() @app.exception_handler(BaseAPIException) async def api_exception_handler(request: Request, exc: BaseAPIException): """处理自定义 API 异常""" logger.warning(f"API Exception: {exc.detail}") return JSONResponse( status_code=exc.status_code, content={ "success": False, "error": exc.detail } ) @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """处理未预期的异常""" logger.error(f"Unhandled exception: {str(exc)}", exc_info=True) return JSONResponse( status_code=500, content={ "success": False, "error": { "code": "INTERNAL_SERVER_ERROR", "message": "服务器内部错误" } } )
# tests/unit/test_user_service.py import pytest from unittest.mock import Mock, MagicMock, patch from src.services.user_service import UserService from src.schemas.user_schema import UserCreate, UserRole from src.core.exceptions import BusinessException class TestUserService: """UserService 单元测试类""" @pytest.fixture def mock_db(self): """模拟数据库会话""" db = Mock() db.commit = Mock() db.refresh = Mock() return db @pytest.fixture def user_service(self, mock_db): """UserService 实例""" with patch('src.services.user_service.UserRepository'): return UserService(mock_db) def test_create_user_success(self, user_service, mock_db): """测试成功创建用户""" # Arrange user_data = UserCreate( username="test_user", email="test@example.com", password="TestPass123" ) mock_repo = user_service.repository mock_repo.find_by_username.return_value = None mock_repo.create.return_value = Mock(id="123", username="test_user") # Act result = user_service.create(user_data) # Assert assert result.username == "test_user" mock_repo.find_by_username.assert_called_once_with("test_user") mock_repo.create.assert_called_once() mock_db.commit.assert_called_once() def test_create_user_duplicate(self, user_service, mock_db): """测试创建重复用户""" # Arrange user_data = UserCreate( username="existing_user", email="test@example.com", password="TestPass123" ) mock_repo = user_service.repository mock_repo.find_by_username.return_value = Mock() # Act & Assert with pytest.raises(BusinessException) as exc_info: user_service.create(user_data) assert exc_info.value.detail["code"] == "USER_ALREADY_EXISTS"
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 24.1.0
hooks:
- id: black
language_version: python3.11
- repo: https://github.com/pycqa/isort
rev: 5.13.2
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
hooks:
- id: mypy
additional_dependencies: [types-all]
- repo: https://github.com/PyCQA/bandit
rev: 1.7.7
hooks:
- id: bandit
args: ["-r", "src/"]