Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/web/routes/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ def _raise_if_cancelled(reason: str = "任务已取消") -> None:
service_type = EmailServiceType(email_service_type)
settings = get_settings()

# 批量任务可能已在创建阶段绑定了独立邮箱服务 ID(例如 Outlook 批量注册)。
if not email_service_id and getattr(task, "email_service_id", None):
email_service_id = int(task.email_service_id)

# 优先使用数据库中配置的邮箱服务
if email_service_id:
from ...database.models import EmailService as EmailServiceModel
Expand Down Expand Up @@ -900,6 +904,21 @@ def update_batch_status(**kwargs):
return add_batch_log, update_batch_status


def _resolve_task_email_service_id(
task_uuid: str,
fallback_email_service_id: Optional[int],
) -> Optional[int]:
"""优先使用任务自身绑定的邮箱服务,缺失时再回退到批量级配置。"""
try:
with get_db() as db:
task = crud.get_registration_task(db, task_uuid)
if task and getattr(task, "email_service_id", None):
return int(task.email_service_id)
except Exception as exc:
logger.warning("读取任务绑定邮箱服务失败 %s: %s", task_uuid, exc)
return fallback_email_service_id


async def run_batch_parallel(
batch_id: str,
task_uuids: List[str],
Expand Down Expand Up @@ -942,8 +961,9 @@ async def _run_one(idx: int, uuid: str):
task_manager.cancel_task(uuid)
task_manager.update_status(uuid, "cancelled", error="批量任务已取消")
return
resolved_email_service_id = _resolve_task_email_service_id(uuid, email_service_id)
await run_registration_task(
uuid, email_service_type, proxy, email_service_config, email_service_id,
uuid, email_service_type, proxy, email_service_config, resolved_email_service_id,
log_prefix=prefix, batch_id=batch_id,
auto_upload_cpa=auto_upload_cpa, cpa_service_ids=cpa_service_ids or [],
auto_upload_sub2api=auto_upload_sub2api, sub2api_service_ids=sub2api_service_ids or [],
Expand Down Expand Up @@ -1025,8 +1045,9 @@ async def _run_and_release(idx: int, uuid: str, pfx: str):
task_manager.cancel_task(uuid)
task_manager.update_status(uuid, "cancelled", error="批量任务已取消")
return
resolved_email_service_id = _resolve_task_email_service_id(uuid, email_service_id)
await run_registration_task(
uuid, email_service_type, proxy, email_service_config, email_service_id,
uuid, email_service_type, proxy, email_service_config, resolved_email_service_id,
log_prefix=pfx, batch_id=batch_id,
auto_upload_cpa=auto_upload_cpa, cpa_service_ids=cpa_service_ids or [],
auto_upload_sub2api=auto_upload_sub2api, sub2api_service_ids=sub2api_service_ids or [],
Expand Down Expand Up @@ -2112,6 +2133,7 @@ async def run_outlook_batch_registration(
tm_service_ids: List[int] = None,
auto_upload_new_api: bool = False,
new_api_service_ids: List[int] = None,
registration_type: str = RoleTag.CHILD.value,
):
"""
异步执行 Outlook 批量注册任务,复用通用并发逻辑
Expand Down Expand Up @@ -2157,6 +2179,7 @@ async def run_outlook_batch_registration(
tm_service_ids=tm_service_ids,
auto_upload_new_api=auto_upload_new_api,
new_api_service_ids=new_api_service_ids,
registration_type=registration_type,
)


Expand Down Expand Up @@ -2392,4 +2415,3 @@ async def delete_scheduled_registration_job(job_uuid: str):
raise HTTPException(status_code=400, detail="无法删除执行中的计划任务")
crud.delete_scheduled_registration_job(db, job_uuid)
return {'success': True, 'message': '计划任务已删除'}

151 changes: 151 additions & 0 deletions tests/test_outlook_batch_registration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import asyncio

from src.web.routes import registration


def test_start_outlook_batch_registration_schedules_registration_type(monkeypatch):
captured = {}

def fake_schedule(background_tasks, coroutine_func, *args):
captured["background_tasks"] = background_tasks
captured["coroutine_func"] = coroutine_func
captured["args"] = args

monkeypatch.setattr(registration, "_schedule_async_job", fake_schedule)

request = registration.OutlookBatchRegistrationRequest(
service_ids=[1, 2, 3],
skip_registered=False,
registration_type="parent",
)

response = asyncio.run(registration._start_outlook_batch_registration_internal(request))

try:
assert response.to_register == 3
assert response.service_ids == [1, 2, 3]
assert captured["coroutine_func"] is registration.run_outlook_batch_registration
assert captured["args"][-1] == "parent"
finally:
registration.batch_tasks.pop(response.batch_id, None)


def test_run_outlook_batch_registration_passes_registration_type(monkeypatch):
created_service_ids = []
captured = {}

class DummyDb:
def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def fake_get_db():
return DummyDb()

def fake_create_registration_task(db, task_uuid, proxy, email_service_id):
created_service_ids.append(email_service_id)

async def fake_run_batch_registration(**kwargs):
captured.update(kwargs)

monkeypatch.setattr(registration, "get_db", fake_get_db)
monkeypatch.setattr(registration.crud, "create_registration_task", fake_create_registration_task)
monkeypatch.setattr(registration, "run_batch_registration", fake_run_batch_registration)

asyncio.run(
registration.run_outlook_batch_registration(
batch_id="batch-1",
service_ids=[1, 2, 3],
skip_registered=False,
proxy=None,
interval_min=5,
interval_max=30,
concurrency=2,
mode="pipeline",
registration_type="parent",
)
)

assert created_service_ids == [1, 2, 3]
assert captured["email_service_type"] == "outlook"
assert captured["registration_type"] == "parent"


def test_resolve_task_email_service_id_prefers_task_binding(monkeypatch):
class DummyDb:
def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

class DummyTask:
email_service_id = 42

monkeypatch.setattr(registration, "get_db", lambda: DummyDb())
monkeypatch.setattr(registration.crud, "get_registration_task", lambda db, task_uuid: DummyTask())

assert registration._resolve_task_email_service_id("task-1", None) == 42


def test_run_batch_pipeline_uses_task_bound_email_service_ids(monkeypatch):
captured = []

class DummyDb:
def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

class DummyTask:
status = "completed"
error_message = None

async def fake_run_registration_task(
task_uuid,
email_service_type,
proxy,
email_service_config,
email_service_id=None,
**kwargs,
):
captured.append((task_uuid, email_service_id))

monkeypatch.setattr(registration, "run_registration_task", fake_run_registration_task)
monkeypatch.setattr(
registration,
"_resolve_task_email_service_id",
lambda task_uuid, fallback: {"task-1": 101, "task-2": 202}[task_uuid],
)
monkeypatch.setattr(registration, "get_db", lambda: DummyDb())
monkeypatch.setattr(registration.crud, "get_registration_task", lambda db, task_uuid: DummyTask())
monkeypatch.setattr(registration.task_manager, "init_batch", lambda batch_id, total: None)
monkeypatch.setattr(registration.task_manager, "add_batch_log", lambda batch_id, message: None)
monkeypatch.setattr(registration.task_manager, "update_batch_status", lambda batch_id, **kwargs: None)
monkeypatch.setattr(registration.task_manager, "is_batch_cancelled", lambda batch_id: False)
monkeypatch.setattr(registration.task_manager, "cancel_task", lambda task_uuid: None)
monkeypatch.setattr(registration.task_manager, "update_status", lambda task_uuid, status, error=None: None)
monkeypatch.setattr(registration.random, "randint", lambda a, b: 0)

asyncio.run(
registration.run_batch_pipeline(
batch_id="batch-pipeline",
task_uuids=["task-1", "task-2"],
email_service_type="outlook",
proxy=None,
email_service_config=None,
email_service_id=None,
interval_min=0,
interval_max=0,
concurrency=1,
registration_type="child",
)
)

try:
assert captured == [("task-1", 101), ("task-2", 202)]
finally:
registration.batch_tasks.pop("batch-pipeline", None)