diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index be9934ab..e9cf978d 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -404,6 +404,10 @@ def _raise_if_cancelled(reason: str = "任务已取消") -> None: service_type = EmailServiceType(email_service_type) settings = get_settings() + # 批量任务可能已在创建阶段绑定了独立邮箱服务 ID(例如 Outlook 批量注册)。 + if not email_service_id and getattr(task, "email_service_id", None): + email_service_id = int(task.email_service_id) + # 优先使用数据库中配置的邮箱服务 if email_service_id: from ...database.models import EmailService as EmailServiceModel @@ -900,6 +904,21 @@ def update_batch_status(**kwargs): return add_batch_log, update_batch_status +def _resolve_task_email_service_id( + task_uuid: str, + fallback_email_service_id: Optional[int], +) -> Optional[int]: + """优先使用任务自身绑定的邮箱服务,缺失时再回退到批量级配置。""" + try: + with get_db() as db: + task = crud.get_registration_task(db, task_uuid) + if task and getattr(task, "email_service_id", None): + return int(task.email_service_id) + except Exception as exc: + logger.warning("读取任务绑定邮箱服务失败 %s: %s", task_uuid, exc) + return fallback_email_service_id + + async def run_batch_parallel( batch_id: str, task_uuids: List[str], @@ -942,8 +961,9 @@ async def _run_one(idx: int, uuid: str): task_manager.cancel_task(uuid) task_manager.update_status(uuid, "cancelled", error="批量任务已取消") return + resolved_email_service_id = _resolve_task_email_service_id(uuid, email_service_id) await run_registration_task( - uuid, email_service_type, proxy, email_service_config, email_service_id, + uuid, email_service_type, proxy, email_service_config, resolved_email_service_id, log_prefix=prefix, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa, cpa_service_ids=cpa_service_ids or [], auto_upload_sub2api=auto_upload_sub2api, sub2api_service_ids=sub2api_service_ids or [], @@ -1025,8 +1045,9 @@ async def _run_and_release(idx: int, uuid: str, pfx: str): task_manager.cancel_task(uuid) task_manager.update_status(uuid, "cancelled", error="批量任务已取消") return + resolved_email_service_id = _resolve_task_email_service_id(uuid, email_service_id) await run_registration_task( - uuid, email_service_type, proxy, email_service_config, email_service_id, + uuid, email_service_type, proxy, email_service_config, resolved_email_service_id, log_prefix=pfx, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa, cpa_service_ids=cpa_service_ids or [], auto_upload_sub2api=auto_upload_sub2api, sub2api_service_ids=sub2api_service_ids or [], @@ -2112,6 +2133,7 @@ async def run_outlook_batch_registration( tm_service_ids: List[int] = None, auto_upload_new_api: bool = False, new_api_service_ids: List[int] = None, + registration_type: str = RoleTag.CHILD.value, ): """ 异步执行 Outlook 批量注册任务,复用通用并发逻辑 @@ -2157,6 +2179,7 @@ async def run_outlook_batch_registration( tm_service_ids=tm_service_ids, auto_upload_new_api=auto_upload_new_api, new_api_service_ids=new_api_service_ids, + registration_type=registration_type, ) @@ -2392,4 +2415,3 @@ async def delete_scheduled_registration_job(job_uuid: str): raise HTTPException(status_code=400, detail="无法删除执行中的计划任务") crud.delete_scheduled_registration_job(db, job_uuid) return {'success': True, 'message': '计划任务已删除'} - diff --git a/tests/test_outlook_batch_registration.py b/tests/test_outlook_batch_registration.py new file mode 100644 index 00000000..b03b10fb --- /dev/null +++ b/tests/test_outlook_batch_registration.py @@ -0,0 +1,151 @@ +import asyncio + +from src.web.routes import registration + + +def test_start_outlook_batch_registration_schedules_registration_type(monkeypatch): + captured = {} + + def fake_schedule(background_tasks, coroutine_func, *args): + captured["background_tasks"] = background_tasks + captured["coroutine_func"] = coroutine_func + captured["args"] = args + + monkeypatch.setattr(registration, "_schedule_async_job", fake_schedule) + + request = registration.OutlookBatchRegistrationRequest( + service_ids=[1, 2, 3], + skip_registered=False, + registration_type="parent", + ) + + response = asyncio.run(registration._start_outlook_batch_registration_internal(request)) + + try: + assert response.to_register == 3 + assert response.service_ids == [1, 2, 3] + assert captured["coroutine_func"] is registration.run_outlook_batch_registration + assert captured["args"][-1] == "parent" + finally: + registration.batch_tasks.pop(response.batch_id, None) + + +def test_run_outlook_batch_registration_passes_registration_type(monkeypatch): + created_service_ids = [] + captured = {} + + class DummyDb: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def fake_get_db(): + return DummyDb() + + def fake_create_registration_task(db, task_uuid, proxy, email_service_id): + created_service_ids.append(email_service_id) + + async def fake_run_batch_registration(**kwargs): + captured.update(kwargs) + + monkeypatch.setattr(registration, "get_db", fake_get_db) + monkeypatch.setattr(registration.crud, "create_registration_task", fake_create_registration_task) + monkeypatch.setattr(registration, "run_batch_registration", fake_run_batch_registration) + + asyncio.run( + registration.run_outlook_batch_registration( + batch_id="batch-1", + service_ids=[1, 2, 3], + skip_registered=False, + proxy=None, + interval_min=5, + interval_max=30, + concurrency=2, + mode="pipeline", + registration_type="parent", + ) + ) + + assert created_service_ids == [1, 2, 3] + assert captured["email_service_type"] == "outlook" + assert captured["registration_type"] == "parent" + + +def test_resolve_task_email_service_id_prefers_task_binding(monkeypatch): + class DummyDb: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyTask: + email_service_id = 42 + + monkeypatch.setattr(registration, "get_db", lambda: DummyDb()) + monkeypatch.setattr(registration.crud, "get_registration_task", lambda db, task_uuid: DummyTask()) + + assert registration._resolve_task_email_service_id("task-1", None) == 42 + + +def test_run_batch_pipeline_uses_task_bound_email_service_ids(monkeypatch): + captured = [] + + class DummyDb: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyTask: + status = "completed" + error_message = None + + async def fake_run_registration_task( + task_uuid, + email_service_type, + proxy, + email_service_config, + email_service_id=None, + **kwargs, + ): + captured.append((task_uuid, email_service_id)) + + monkeypatch.setattr(registration, "run_registration_task", fake_run_registration_task) + monkeypatch.setattr( + registration, + "_resolve_task_email_service_id", + lambda task_uuid, fallback: {"task-1": 101, "task-2": 202}[task_uuid], + ) + monkeypatch.setattr(registration, "get_db", lambda: DummyDb()) + monkeypatch.setattr(registration.crud, "get_registration_task", lambda db, task_uuid: DummyTask()) + monkeypatch.setattr(registration.task_manager, "init_batch", lambda batch_id, total: None) + monkeypatch.setattr(registration.task_manager, "add_batch_log", lambda batch_id, message: None) + monkeypatch.setattr(registration.task_manager, "update_batch_status", lambda batch_id, **kwargs: None) + monkeypatch.setattr(registration.task_manager, "is_batch_cancelled", lambda batch_id: False) + monkeypatch.setattr(registration.task_manager, "cancel_task", lambda task_uuid: None) + monkeypatch.setattr(registration.task_manager, "update_status", lambda task_uuid, status, error=None: None) + monkeypatch.setattr(registration.random, "randint", lambda a, b: 0) + + asyncio.run( + registration.run_batch_pipeline( + batch_id="batch-pipeline", + task_uuids=["task-1", "task-2"], + email_service_type="outlook", + proxy=None, + email_service_config=None, + email_service_id=None, + interval_min=0, + interval_max=0, + concurrency=1, + registration_type="child", + ) + ) + + try: + assert captured == [("task-1", 101), ("task-2", 202)] + finally: + registration.batch_tasks.pop("batch-pipeline", None)