Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions llm_web_kit/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@
"""

import logging
import os
from contextvars import ContextVar
from functools import lru_cache
from logging.handlers import TimedRotatingFileHandler
from typing import Optional

from pydantic_settings import BaseSettings, SettingsConfigDict

logger = logging.getLogger(__name__)

# 创建一个 ContextVar 用于存储 request_id,提供默认值
request_id_var: ContextVar[Optional[str]] = ContextVar("request_id", default=None)


class RequestIdFilter(logging.Filter):
"""日志过滤器,用于将 request_id 从 ContextVar 注入到日志记录中。"""

def filter(self, record):
record.request_id = request_id_var.get()
return True


class Settings(BaseSettings):
"""应用配置设置."""
Expand All @@ -27,6 +41,8 @@ class Settings(BaseSettings):

# 日志配置
log_level: str = "INFO"
log_dir: str = "logs"
log_filename: str = "api.log"

# 模型配置
model_path: Optional[str] = None
Expand All @@ -38,8 +54,8 @@ class Settings(BaseSettings):

# 数据库配置
database_url: Optional[str] = None # 从环境变量 DATABASE_URL 读取
db_pool_size: int = 5
db_max_overflow: int = 10
db_pool_size: int = 200
db_max_overflow: int = 100

# pydantic v2 配置写法
model_config = SettingsConfigDict(
Expand All @@ -57,14 +73,39 @@ def get_settings() -> Settings:
def get_logger(name: str = __name__) -> logging.Logger:
"""获取配置好的日志记录器."""
logger = logging.getLogger(name)
logger.setLevel(get_settings().log_level)
logger.addFilter(RequestIdFilter()) # 添加过滤器

if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# 控制台处理器
stream_handler = logging.StreamHandler()
stream_formatter = logging.Formatter(
'%(asctime)s - %(request_id)s - %(name)s - %(levelname)s - %(message)s'
)
stream_handler.setFormatter(stream_formatter)
logger.addHandler(stream_handler)

# 文件处理器 (按天轮换)
settings = get_settings()
log_dir = settings.log_dir
if not os.path.exists(log_dir):
os.makedirs(log_dir)

log_file_path = os.path.join(log_dir, settings.log_filename)

file_handler = TimedRotatingFileHandler(
log_file_path,
when="midnight", # 每天午夜轮换
interval=1,
backupCount=30, # 保留30天的日志
encoding='utf-8'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(get_settings().log_level)
file_formatter = logging.Formatter(
'%(asctime)s - %(request_id)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

return logger


Expand Down
30 changes: 28 additions & 2 deletions llm_web_kit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"""

import uvicorn
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from .dependencies import get_inference_service, get_logger, get_settings
from .dependencies import (get_inference_service, get_logger, get_settings,
request_id_var)
from .routers import htmls
from .services.request_log_service import RequestLogService

settings = get_settings()
logger = get_logger(__name__)
Expand All @@ -33,6 +35,30 @@
allow_headers=["*"],
)


@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
"""中间件,用于生成 request_id 并通过 ContextVar 在整个请求周期中传递。"""
# 从请求头中获取 request_id,如果不存在则生成一个新的
request_id = request.headers.get("X-Request-ID")
if not request_id:
request_id = RequestLogService._generate_request_id()

# 使用 ContextVar 设置 request_id
token = request_id_var.set(request_id)

# 处理请求
response = await call_next(request)

# 在响应头中添加 request_id
response.headers["X-Request-ID"] = request_id

# 重置 ContextVar
request_id_var.reset(token)

return response


# 注册路由
app.include_router(htmls.router, prefix="/api/v1", tags=["HTML 处理"])

Expand Down
137 changes: 63 additions & 74 deletions llm_web_kit/api/routers/htmls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
提供 HTML 解析、内容提取等功能的 API 端点。
"""

import base64
import html
import time
from typing import Optional

from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi import (APIRouter, BackgroundTasks, Body, Depends, File,
HTTPException, UploadFile)
from sqlalchemy.ext.asyncio import AsyncSession

from ..database import get_db_session
from ..dependencies import get_logger, get_settings
from ..dependencies import get_logger, get_settings, request_id_var
from ..models.request import HTMLParseRequest
from ..models.response import HTMLParseResponse
from ..services.html_service import HTMLService
Expand All @@ -23,16 +27,20 @@

@router.post('/html/parse', response_model=HTMLParseResponse)
async def parse_html(
request: HTMLParseRequest,
html_service: HTMLService = Depends(HTMLService),
db_session: Optional[AsyncSession] = Depends(get_db_session)
background_tasks: BackgroundTasks,
request: HTMLParseRequest = Body(...),
html_service: HTMLService = Depends(HTMLService),
db_session: Optional[AsyncSession] = Depends(get_db_session)
):
"""解析 HTML 内容.

接收 HTML 字符串并返回解析后的结构化内容。
"""
# 生成请求ID
request_id = RequestLogService.generate_request_id()
# 从 context var 获取 request_id
request_id = request_id_var.get()
decoded_bytes = base64.b64decode(request.html_content)
decoded_str = decoded_bytes.decode('utf-8')
unescaped_html = html.unescape(decoded_str)

# 确定输入类型
if request.html_content:
Expand All @@ -43,35 +51,32 @@ async def parse_html(
input_type = 'unknown'

# 创建请求日志
await RequestLogService.create_log(
start_time = time.time()
await RequestLogService.initial_log(
session=db_session,
request_id=request_id,
input_type=input_type,
input_html=request.html_content,
input_html=unescaped_html,
url=request.url,
)

# 立即提交,使 processing 状态在数据库中可见
if db_session:
try:
await db_session.commit()
except Exception as commit_error:
logger.error(f'提交初始日志时出错: {commit_error}')
end_time = time.time()
logger.info(f'创建日志耗时: {end_time - start_time}秒')

try:
logger.info(f'开始解析 HTML [request_id={request_id}],内容长度: {len(request.html_content) if request.html_content else 0}')
logger.info(f'开始解析 HTML,内容长度: {len(unescaped_html) if unescaped_html else 0}')

result = await html_service.parse_html(
html_content=request.html_content,
html_content=unescaped_html,
url=request.url,
request_id=request_id,
options=request.options
)

# 更新日志为成功
await RequestLogService.update_log_success(
session=db_session,
request_id=request_id,
output_markdown=result.get('markdown'),
# 将成功日志更新操作添加到后台任务
background_tasks.add_task(
RequestLogService.log_success_bg,
request_id,
result.get('markdown')
)

return HTMLParseResponse(
Expand All @@ -81,37 +86,32 @@ async def parse_html(
request_id=request_id
)
except Exception as e:
logger.error(f'HTML 解析失败 [request_id={request_id}]: {str(e)}')

# 更新日志为失败
await RequestLogService.update_log_failure(
session=db_session,
request_id=request_id,
error_message=str(e),
error_message = str(e)
logger.error(f'HTML 解析失败: {error_message}')

# 将失败日志更新操作添加到后台任务
background_tasks.add_task(
RequestLogService.log_failure_bg,
request_id,
error_message
)

# 手动提交事务,确保失败日志被保存
if db_session:
try:
await db_session.commit()
except Exception as commit_error:
logger.error(f'提交失败日志时出错: {commit_error}')

raise HTTPException(status_code=500, detail=f'HTML 解析失败: {str(e)}')
raise HTTPException(status_code=500, detail=f'HTML 解析失败: {error_message}')


@router.post('/html/upload')
async def upload_html_file(
file: UploadFile = File(...),
html_service: HTMLService = Depends(HTMLService),
db_session: Optional[AsyncSession] = Depends(get_db_session)
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
html_service: HTMLService = Depends(HTMLService),
db_session: Optional[AsyncSession] = Depends(get_db_session)
):
"""上传 HTML 文件进行解析.

支持上传 HTML 文件,自动解析并返回结果。
"""
# 生成请求ID
request_id = RequestLogService.generate_request_id()
# 从 context var 获取 request_id
request_id = request_id_var.get()

try:
if not file.filename.endswith(('.html', '.htm')):
Expand All @@ -120,31 +120,26 @@ async def upload_html_file(
content = await file.read()
html_content = content.decode('utf-8')

logger.info(f'上传 HTML 文件 [request_id={request_id}]: {file.filename}, 大小: {len(content)} bytes')

logger.info(f'上传 HTML 文件: {file.filename}, 大小: {len(content)} bytes')
start_time = time.time()
# 创建请求日志
await RequestLogService.create_log(
await RequestLogService.initial_log(
session=db_session,
request_id=request_id,
input_type='file',
input_html=html_content,
url=None,
)
end_time = time.time()
logger.info(f'创建日志耗时: {end_time - start_time}秒')

# 立即提交,使 processing 状态在数据库中可见
if db_session:
try:
await db_session.commit()
except Exception as commit_error:
logger.error(f'提交初始日志时出错: {commit_error}')

result = await html_service.parse_html(html_content=html_content, url="www.baidu.com")
result = await html_service.parse_html(html_content=html_content, url="www.baidu.com", request_id=request_id)

# 更新日志为成功
await RequestLogService.update_log_success(
session=db_session,
request_id=request_id,
output_markdown=result.get('markdown'),
# 将成功日志更新操作添加到后台任务
background_tasks.add_task(
RequestLogService.log_success_bg,
request_id,
result.get('markdown')
)

return HTMLParseResponse(
Expand All @@ -154,23 +149,17 @@ async def upload_html_file(
request_id=request_id
)
except Exception as e:
logger.error(f'HTML 文件解析失败 [request_id={request_id}]: {str(e)}')

# 更新日志为失败
await RequestLogService.update_log_failure(
session=db_session,
request_id=request_id,
error_message=str(e),
error_message = str(e)
logger.error(f'HTML 文件解析失败: {error_message}')

# 将失败日志更新操作添加到后台任务
background_tasks.add_task(
RequestLogService.log_failure_bg,
request_id,
error_message
)

# 手动提交事务,确保失败日志被保存
if db_session:
try:
await db_session.commit()
except Exception as commit_error:
logger.error(f'提交失败日志时出错: {commit_error}')

raise HTTPException(status_code=500, detail=f'HTML 文件解析失败: {str(e)}')
raise HTTPException(status_code=500, detail=f'HTML 文件解析失败: {error_message}')


@router.get('/html/status')
Expand Down
Loading