diff --git a/src/agent/manager.py b/src/agent/manager.py index 52ab7e0..964458f 100644 --- a/src/agent/manager.py +++ b/src/agent/manager.py @@ -47,6 +47,7 @@ ProviderModelCacheEntry, ProviderModelCompatibilityRecord, ) +from src.utils.json import safe_json_dumps logger = logging.getLogger(__name__) @@ -133,7 +134,7 @@ class _ToolTracker: _tool_id_to_name: dict[str, str] = field(default_factory=dict, init=False) async def _put(self, payload: dict) -> None: - await self.queue.put(f"data: {json.dumps(payload, ensure_ascii=False)}\n\n") + await self.queue.put(f"data: {safe_json_dumps(payload, ensure_ascii=False)}\n\n") await asyncio.sleep(0) async def on_first_event(self) -> None: @@ -268,7 +269,7 @@ async def _ticker() -> None: "Timeout extended +%.0fs (SDK activity, was %.0fs left, %d extensions left)", activity_extend, remaining, extensions_remaining, ) - ext_payload = json.dumps( + ext_payload = safe_json_dumps( { "type": "status", "text": f"Агент активен — продлён (+{int(activity_extend)}с)", @@ -287,7 +288,7 @@ async def _ticker() -> None: and (now - api_request_ts[0]) > thinking_delay ): _thinking_shown = True - thinking_payload = json.dumps( + thinking_payload = safe_json_dumps( {"type": "thinking", "text": "Думает..."}, ensure_ascii=False, ) @@ -298,7 +299,7 @@ async def _ticker() -> None: # Show countdown remaining_int = int(deadline - time.monotonic()) if remaining_int > 0: - payload = json.dumps( + payload = safe_json_dumps( {"type": "countdown", "text": f"{label} ({remaining_int}с до таймаута)"}, ensure_ascii=False, ) @@ -482,7 +483,7 @@ def _on_stderr(line: str) -> None: _api_request_count[0] += 1 _api_request_ts[0] = time.monotonic() label = f"Жду ответ Claude API #{_api_request_count[0]}: «{_prompt_short}»" - payload = json.dumps({"type": "status", "text": label}, ensure_ascii=False) + payload = safe_json_dumps({"type": "status", "text": label}, ensure_ascii=False) try: queue.put_nowait(f"data: {payload}\n\n") except Exception: @@ -496,7 +497,7 @@ def _on_stderr(line: str) -> None: break if label and label != _last_emitted[0]: _last_emitted[0] = label - payload = json.dumps({"type": "status", "text": label}, ensure_ascii=False) + payload = safe_json_dumps({"type": "status", "text": label}, ensure_ascii=False) try: queue.put_nowait(f"data: {payload}\n\n") except Exception: @@ -512,7 +513,7 @@ def _on_stderr(line: str) -> None: for tag in ("[WARN]", "[warn]", "[ERROR]", "[error]"): display = display.replace(tag, "").strip() if display: - warn_payload = json.dumps( + warn_payload = safe_json_dumps( {"type": "warning", "text": display}, ensure_ascii=False, ) @@ -660,7 +661,7 @@ def _on_stderr(line: str) -> None: _last_rate_limit[0] = rl_summary if rl_status == "rejected": # Hard reject — surface as warning, not just status - warn_payload = json.dumps( + warn_payload = safe_json_dumps( {"type": "warning", "text": f"⛔ {rl_text}. API отклоняет запросы."}, ensure_ascii=False, ) @@ -697,7 +698,7 @@ def _on_stderr(line: str) -> None: _last_activity[0] = time.monotonic() full_text += text_chunk streamed = True - chunk_payload = json.dumps( + chunk_payload = safe_json_dumps( {"text": text_chunk}, ensure_ascii=False ) await queue.put(f"data: {chunk_payload}\n\n") @@ -727,7 +728,7 @@ def _on_stderr(line: str) -> None: for _idx, block in enumerate(msg.content): if isinstance(block, TextBlock) and not streamed: full_text += block.text - chunk_payload = json.dumps( + chunk_payload = safe_json_dumps( {"text": block.text}, ensure_ascii=False ) await queue.put(f"data: {chunk_payload}\n\n") @@ -739,7 +740,7 @@ def _on_stderr(line: str) -> None: block.name, _idx, tool_use_id=block.id ) tracker.accumulate_input( - json.dumps(block.input or {}, ensure_ascii=False) + safe_json_dumps(block.input or {}, ensure_ascii=False) ) await tracker.on_block_stop(_idx) elif isinstance(block, ToolResultBlock): @@ -771,7 +772,7 @@ def _on_stderr(line: str) -> None: _sid = getattr(msg, "session_id", None) if isinstance(_sid, str) and _sid: done_data["session_id"] = _sid - done_payload = json.dumps(done_data, ensure_ascii=False) + done_payload = safe_json_dumps(done_data, ensure_ascii=False) await queue.put(f"data: {done_payload}\n\n") else: logger.warning( @@ -791,7 +792,7 @@ def _on_stderr(line: str) -> None: "Agent timeout after %.1fs (thread %d): %s", elapsed, thread_id, exc, ) stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else None - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": str(exc), "details": stderr_summary}, ensure_ascii=False, ) @@ -806,7 +807,7 @@ def _on_stderr(line: str) -> None: "Установите: npm install -g @anthropic-ai/claude-code" ) await queue.put( - f"data: {json.dumps({'error': err_msg}, ensure_ascii=False)}\n\n" + f"data: {safe_json_dumps({'error': err_msg}, ensure_ascii=False)}\n\n" ) await queue.put(None) return @@ -822,7 +823,7 @@ def _on_stderr(line: str) -> None: # Truncate long stderr for UI if len(details) > 500: details = details[:500] + "..." - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": "Ошибка процесса Claude CLI", "details": details}, ensure_ascii=False, ) @@ -845,7 +846,7 @@ def _on_stderr(line: str) -> None: if "overloaded" in str(exc).lower() else "Не удалось подключиться к Claude CLI" ) - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": conn_msg, "details": stderr_summary}, ensure_ascii=False, ) @@ -859,7 +860,7 @@ def _on_stderr(line: str) -> None: "Claude SDK error after %.1fs (thread %d): %s", elapsed, thread_id, exc, ) stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else "" - err_payload = json.dumps( + err_payload = safe_json_dumps( { "error": f"Ошибка Claude SDK: {exc}", "details": stderr_summary or None, @@ -937,7 +938,7 @@ def _on_stderr(line: str) -> None: ) # Send error details to user via SSE stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else "" - err_payload = json.dumps( + err_payload = safe_json_dumps( { "error": f"Ошибка агента: {last_err}", "details": stderr_summary or None, @@ -1628,9 +1629,9 @@ async def chat_stream( if not full_text: logger.debug("Deepagents returned empty response for provider=%s", cfg.provider) if full_text: - chunk_payload = json.dumps({"text": full_text}, ensure_ascii=False) + chunk_payload = safe_json_dumps({"text": full_text}, ensure_ascii=False) await queue.put(f"data: {chunk_payload}\n\n") - done_payload = json.dumps( + done_payload = safe_json_dumps( { "done": True, "full_text": full_text, @@ -1970,7 +1971,7 @@ async def chat_stream( status = await self.get_runtime_status() backend_name = status.selected_backend if status.error and (backend_name is None or status.using_override): - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": f"Ошибка агента: {status.error}"}, ensure_ascii=False ) yield f"data: {err_payload}\n\n" @@ -1983,7 +1984,7 @@ async def chat_stream( backend = self._deepagents_backend model = None else: - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": "Ошибка агента: не удалось выбрать backend."}, ensure_ascii=False, ) @@ -2057,7 +2058,7 @@ async def _run_backend( ): error_text = "Не удалось подключиться к Ollama. Проверьте, что сервис запущен." - err_payload = json.dumps( + err_payload = safe_json_dumps( {"error": failure_prefix(error_text)}, ensure_ascii=False, ) @@ -2084,7 +2085,7 @@ def _cleanup(t: asyncio.Task) -> None: task.add_done_callback(_cleanup) # Immediate feedback before backend connects (can take 10-30s) - init_payload = json.dumps( + init_payload = safe_json_dumps( {"type": "status", "text": f"Подключение к {backend_name}..."}, ensure_ascii=False, ) diff --git a/src/database/repositories/collection_tasks.py b/src/database/repositories/collection_tasks.py index c67c4a3..52ae0f4 100644 --- a/src/database/repositories/collection_tasks.py +++ b/src/database/repositories/collection_tasks.py @@ -18,6 +18,7 @@ StatsAllTaskPayload, TranslateBatchTaskPayload, ) +from src.utils.json import safe_json_dumps _ALLOWED_PAYLOAD_FILTER_KEYS = frozenset({"sq_id", "pipeline_id"}) @@ -90,7 +91,7 @@ def _serialize_payload( ), ): return payload.model_dump_json() - return json.dumps(payload) + return safe_json_dumps(payload) @staticmethod def _to_task(row: aiosqlite.Row) -> CollectionTask: diff --git a/src/database/repositories/generation_runs.py b/src/database/repositories/generation_runs.py index 6403724..9e025d2 100644 --- a/src/database/repositories/generation_runs.py +++ b/src/database/repositories/generation_runs.py @@ -69,11 +69,18 @@ async def create_run(self, pipeline_id: int | None, prompt: str) -> int: await self._db.commit() return cur.lastrowid or 0 - async def set_status(self, run_id: int, status: str) -> None: - await self._db.execute( - "UPDATE generation_runs SET status = ?, updated_at = datetime('now') WHERE id = ?", - (status, run_id), - ) + async def set_status(self, run_id: int, status: str, metadata: dict | None = None) -> None: + if metadata is not None: + await self._db.execute( + ("UPDATE generation_runs SET status = ?, metadata = ?, " + "updated_at = datetime('now') WHERE id = ?"), + (status, json.dumps(metadata, ensure_ascii=False), run_id), + ) + else: + await self._db.execute( + "UPDATE generation_runs SET status = ?, updated_at = datetime('now') WHERE id = ?", + (status, run_id), + ) await self._db.commit() async def save_result( diff --git a/src/database/repositories/telegram_commands.py b/src/database/repositories/telegram_commands.py index 27ae7fe..fcbb638 100644 --- a/src/database/repositories/telegram_commands.py +++ b/src/database/repositories/telegram_commands.py @@ -8,6 +8,7 @@ from src.database.repositories._transactions import begin_immediate from src.models import TelegramCommand, TelegramCommandStatus +from src.utils.json import safe_json_dumps def _parse_json(raw: str | None) -> dict[str, Any] | None: @@ -48,10 +49,10 @@ async def create_command(self, command: TelegramCommand) -> int: """, ( command.command_type, - json.dumps(command.payload), + safe_json_dumps(command.payload), command.status.value, command.requested_by, - json.dumps(command.result_payload) if command.result_payload is not None else None, + safe_json_dumps(command.result_payload) if command.result_payload is not None else None, ), ) await self._db.commit() @@ -174,7 +175,7 @@ async def update_command( TelegramCommandStatus.CANCELLED, }: finished_at = datetime.now(timezone.utc).isoformat() - payload_json = json.dumps(payload) if payload is not None else None + payload_json = safe_json_dumps(payload) if payload is not None else None if status == TelegramCommandStatus.PENDING: # Reset started_at when re-queueing so a retried command shows # a fresh run timestamp rather than the interrupted attempt's. @@ -182,7 +183,7 @@ async def update_command( params: list[Any] = [ status.value, error, - json.dumps(result_payload) if result_payload is not None else None, + safe_json_dumps(result_payload) if result_payload is not None else None, ] if payload_json is not None: sets.append("payload = ?") @@ -197,7 +198,7 @@ async def update_command( params = [ status.value, error, - json.dumps(result_payload) if result_payload is not None else None, + safe_json_dumps(result_payload) if result_payload is not None else None, finished_at, ] if payload_json is not None: diff --git a/src/services/agent_provider_service.py b/src/services/agent_provider_service.py index 0a91c16..0fa4e4d 100644 --- a/src/services/agent_provider_service.py +++ b/src/services/agent_provider_service.py @@ -25,6 +25,7 @@ from src.config import AppConfig, resolve_session_encryption_secret from src.database import Database from src.security import SessionCipher +from src.utils.json import safe_json_dumps logger = logging.getLogger(__name__) @@ -165,7 +166,7 @@ async def save_provider_configs(self, configs: list[ProviderRuntimeConfig]) -> N "last_validation_error": cfg.last_validation_error, } ) - await self._db.set_setting(PROVIDER_SETTINGS_KEY, json.dumps(payload, ensure_ascii=False)) + await self._db.set_setting(PROVIDER_SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False)) async def load_model_cache(self) -> dict[str, ProviderModelCacheEntry]: raw = await self._db.get_setting(MODEL_CACHE_SETTINGS_KEY) @@ -234,7 +235,7 @@ async def save_model_cache(self, cache: dict[str, ProviderModelCacheEntry]) -> N for provider, entry in cache.items() } await self._db.set_setting( - MODEL_CACHE_SETTINGS_KEY, json.dumps(payload, ensure_ascii=False) + MODEL_CACHE_SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False) ) async def refresh_models_for_provider( @@ -460,7 +461,7 @@ def config_fingerprint( secret_hash = "" if secret_payload: secret_hash = hashlib.sha256( - json.dumps(secret_payload, sort_keys=True, ensure_ascii=False).encode("utf-8") + safe_json_dumps(secret_payload, sort_keys=True, ensure_ascii=False).encode("utf-8") ).hexdigest()[:16] payload = { "provider": cfg.provider, @@ -470,7 +471,7 @@ def config_fingerprint( "secret_hash": secret_hash, } return hashlib.sha256( - json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8") + safe_json_dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8") ).hexdigest() def get_compatibility_record( @@ -648,7 +649,7 @@ async def export_compatibility_catalog( "providers": providers_payload, } export_path.write_text( - json.dumps(payload, ensure_ascii=False, indent=2) + "\n", + safe_json_dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) return export_path diff --git a/src/services/content_generation_service.py b/src/services/content_generation_service.py index 63f0b51..f7434d2 100644 --- a/src/services/content_generation_service.py +++ b/src/services/content_generation_service.py @@ -180,13 +180,15 @@ async def generate( except Exception: logger.warning("Failed to send draft notification", exc_info=True) return run - except Exception: + except Exception as exc: logger.exception( "Content generation failed for pipeline_id=%s run_id=%s", pipeline.id, run_id, ) - await self._db.repos.generation_runs.set_status(run_id, "failed") + node_errors = getattr(exc, "node_errors", None) + meta = {"node_errors": node_errors} if node_errors else None + await self._db.repos.generation_runs.set_status(run_id, "failed", metadata=meta) raise async def _run_generation( diff --git a/src/services/image_provider_service.py b/src/services/image_provider_service.py index e1d0743..ac4f55c 100644 --- a/src/services/image_provider_service.py +++ b/src/services/image_provider_service.py @@ -16,6 +16,7 @@ from src.config import AppConfig, resolve_session_encryption_secret from src.database import Database from src.security import SessionCipher +from src.utils.json import safe_json_dumps if TYPE_CHECKING: from src.services.provider_adapters import ImageAdapter @@ -119,7 +120,7 @@ async def save_provider_configs(self, configs: list[ImageProviderConfig]) -> Non elif cfg._api_key_enc_preserved: entry["api_key_enc"] = cfg._api_key_enc_preserved payload.append(entry) - await self._db.set_setting(SETTINGS_KEY, json.dumps(payload, ensure_ascii=False)) + await self._db.set_setting(SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False)) # ── UI helpers ── diff --git a/src/services/pipeline_executor.py b/src/services/pipeline_executor.py index 54e6cf1..b43de2b 100644 --- a/src/services/pipeline_executor.py +++ b/src/services/pipeline_executor.py @@ -96,43 +96,47 @@ async def execute( ordered = _topological_sort(graph) skipped: set[str] = set() + node_errors: list[dict[str, Any]] = [] for node in ordered: - if node.id in skipped: - logger.debug("Skipping node %s (downstream of failed condition/trigger)", node.id) - continue - - handler = get_handler(node.type) - prev_current_node_id = services.get("_current_node_id") - services["_current_node_id"] = node.id - try: - logger.debug("Executing node %s (%s)", node.id, node.type) - await handler.execute(node.config, context, services) - - # Short-circuit condition nodes: skip only downstream subtree if False - if node.type == PipelineNodeType.CONDITION: - if not context.get_global("condition_result", True): - logger.debug("Condition node %s is False; skipping downstream nodes", node.id) - skipped.update(self._downstream_nodes(graph, node.id)) - - # Short-circuit trigger nodes: skip downstream if not matched - if node.type == PipelineNodeType.SEARCH_QUERY_TRIGGER: - if not context.get_global("trigger_matched", False): - logger.debug("Trigger node %s did not match; skipping downstream nodes", node.id) - skipped.update(self._downstream_nodes(graph, node.id)) - except Exception: - logger.exception("Node %s (%s) failed during pipeline execution", node.id, node.type) - raise - finally: - if prev_current_node_id is None: - services.pop("_current_node_id", None) - else: - services["_current_node_id"] = prev_current_node_id + if node.id in skipped: + logger.debug("Skipping node %s (downstream of failed condition/trigger)", node.id) + continue + + handler = get_handler(node.type) + prev_current_node_id = services.get("_current_node_id") + services["_current_node_id"] = node.id + try: + logger.debug("Executing node %s (%s)", node.id, node.type) + await handler.execute(node.config, context, services) + + # Short-circuit condition nodes: skip only downstream subtree if False + if node.type == PipelineNodeType.CONDITION: + if not context.get_global("condition_result", True): + logger.debug("Condition node %s is False; skipping downstream nodes", node.id) + skipped.update(self._downstream_nodes(graph, node.id)) + + # Short-circuit trigger nodes: skip downstream if not matched + if node.type == PipelineNodeType.SEARCH_QUERY_TRIGGER: + if not context.get_global("trigger_matched", False): + logger.debug("Trigger node %s did not match; skipping downstream nodes", node.id) + skipped.update(self._downstream_nodes(graph, node.id)) + except Exception as exc: + logger.exception("Node %s (%s) failed during pipeline execution", node.id, node.type) + errors = context.get_errors() + if errors: + exc.node_errors = errors # type: ignore[attr-defined] + raise + finally: + if prev_current_node_id is None: + services.pop("_current_node_id", None) + else: + services["_current_node_id"] = prev_current_node_id + node_errors = context.get_errors() generated_text = context.get_global("generated_text", "") citations = context.get_global("citations", []) action_counts = get_action_counts(context) - node_errors = context.get_errors() result_kind, result_count = summarize_result( generated_text=generated_text, citations=citations, diff --git a/src/services/pipeline_nodes/handlers.py b/src/services/pipeline_nodes/handlers.py index ff0c8f3..372d9a5 100644 --- a/src/services/pipeline_nodes/handlers.py +++ b/src/services/pipeline_nodes/handlers.py @@ -87,7 +87,13 @@ class RetrieveContextHandler(BaseNodeHandler): async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: search_engine = services.get("search_engine") if search_engine is None: - logger.warning("RetrieveContextHandler: no search_engine in services, skipping") + node_id = _current_node_id(services, default="retrieve_context") + context.record_error( + node_id=node_id, + code="missing_dependency", + detail="search_engine not available in services", + ) + logger.warning("RetrieveContextHandler[%s]: no search_engine, skipping", node_id) context.set_global("context_messages", []) return @@ -205,7 +211,13 @@ class ImageGenerateHandler(BaseNodeHandler): async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: image_service = services.get("image_service") if image_service is None: - logger.info("ImageGenerateHandler: no image_service configured, skipping") + node_id = _current_node_id(services, default="image_generate") + context.record_error( + node_id=node_id, + code="missing_dependency", + detail="image_service not available in services", + ) + logger.warning("ImageGenerateHandler[%s]: no image_service, skipping", node_id) return text = context.get_global("generated_text", "") or "" @@ -248,7 +260,13 @@ class NotifyHandler(BaseNodeHandler): async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: notification_service = services.get("notification_service") if notification_service is None: - logger.info("NotifyHandler: no notification_service configured, skipping") + node_id = _current_node_id(services, default="notify") + context.record_error( + node_id=node_id, + code="missing_dependency", + detail="notification_service not available in services", + ) + logger.warning("NotifyHandler[%s]: no notification_service, skipping", node_id) return text = context.get_global("generated_text", "") or context.get_global("trigger_text", "") or "" @@ -299,7 +317,7 @@ async def execute(self, node_config: dict, context: NodeContext, services: dict) detail="client_pool not available in services", ) logger.warning("ReactHandler[%s]: no client_pool, skipping", node_id) - return + raise RuntimeError("ReactHandler: client_pool not available") messages = context.get_global("context_messages", []) emoji = node_config.get("emoji") or "👍" @@ -379,7 +397,7 @@ async def execute(self, node_config: dict, context: NodeContext, services: dict) detail="client_pool not available in services", ) logger.warning("ForwardHandler[%s]: no client_pool, skipping", node_id) - return + raise RuntimeError("ForwardHandler: client_pool not available") messages = context.get_global("context_messages", []) targets = node_config.get("targets", []) @@ -448,7 +466,7 @@ async def execute(self, node_config: dict, context: NodeContext, services: dict) detail="client_pool not available in services", ) logger.warning("DeleteMessageHandler[%s]: no client_pool, skipping", node_id) - return + raise RuntimeError("DeleteMessageHandler: client_pool not available") messages = context.get_global("context_messages", []) resolved_phone = _resolve_account_phone(services.get("account_phone"), services, context) @@ -540,7 +558,13 @@ class SearchQueryTriggerHandler(BaseNodeHandler): async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: search_engine = services.get("search_engine") if search_engine is None: - logger.warning("SearchQueryTriggerHandler: no search_engine, skipping") + node_id = _current_node_id(services, default="search_query_trigger") + context.record_error( + node_id=node_id, + code="missing_dependency", + detail="search_engine not available in services", + ) + logger.warning("SearchQueryTriggerHandler[%s]: no search_engine, skipping", node_id) return query = node_config.get("query", "") diff --git a/src/services/publish_service.py b/src/services/publish_service.py index 40a895a..6f2499f 100644 --- a/src/services/publish_service.py +++ b/src/services/publish_service.py @@ -82,6 +82,8 @@ async def _publish_to_target( ) -> PublishResult: """Publish to a single target.""" pool = self._client_pool + if pool is None: + return PublishResult(success=False, error="client_pool not configured") acquired_phone: str | None = None try: result = await pool.get_client_by_phone(target.phone) diff --git a/src/services/unified_dispatcher.py b/src/services/unified_dispatcher.py index a2a1fa0..0500e42 100644 --- a/src/services/unified_dispatcher.py +++ b/src/services/unified_dispatcher.py @@ -413,8 +413,8 @@ async def _handle_photo_due(self, task: CollectionTask) -> None: if not self._photo_task_service: await self._tasks.update_collection_task( task.id, - CollectionTaskStatus.COMPLETED, - note="No photo service", + CollectionTaskStatus.FAILED, + error="PhotoTaskService not configured", ) return try: @@ -440,8 +440,8 @@ async def _handle_photo_auto(self, task: CollectionTask) -> None: if not self._photo_auto_upload_service: await self._tasks.update_collection_task( task.id, - CollectionTaskStatus.COMPLETED, - note="No photo auto service", + CollectionTaskStatus.FAILED, + error="PhotoAutoUploadService not configured", ) return try: @@ -674,6 +674,14 @@ async def _handle_content_publish(self, task: CollectionTask) -> None: ) return + if self._client_pool is None: + await self._tasks.update_collection_task( + task.id, + CollectionTaskStatus.FAILED, + error="client_pool not configured", + ) + return + pipeline_id = payload.pipeline_id try: from src.services.pipeline_service import PipelineService diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/json.py b/src/utils/json.py new file mode 100644 index 0000000..c7d63d2 --- /dev/null +++ b/src/utils/json.py @@ -0,0 +1,29 @@ +"""JSON utilities with safe serialization for external data.""" + +import json +from datetime import date, datetime + + +def safe_json_dumps(obj, **kwargs) -> str: + """ + JSON dumps with fallback for non-serializable types. + + Handles: + - datetime, date → .isoformat() + - bytes → .hex() + - Pydantic v2 models → .model_dump() + - Unknown → raises TypeError (fail-fast) + + Use for serializing external/untyped data (Telegram objects, DB payloads, etc.). + """ + def _default(o): + if isinstance(o, (datetime, date)): + return o.isoformat() + if isinstance(o, bytes): + return o.hex() + # Pydantic v2 + if hasattr(o, "model_dump"): + return o.model_dump() + raise TypeError(f"Object of type {type(o).__name__} is not JSON serializable") + + return json.dumps(obj, default=_default, **kwargs) diff --git a/src/web/routes/pipelines.py b/src/web/routes/pipelines.py index f06a464..981c7a7 100644 --- a/src/web/routes/pipelines.py +++ b/src/web/routes/pipelines.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import logging from urllib.parse import quote @@ -25,6 +24,7 @@ from src.services.pipeline_service import ( to_since_hours as _to_since_hours, ) +from src.utils.json import safe_json_dumps from src.web import deps logger = logging.getLogger(__name__) @@ -574,18 +574,18 @@ async def event_gen(): "text": update.get("generated_text"), "citations": update.get("citations"), } - yield f"data: {json.dumps(data)}\n\n" + yield f"data: {safe_json_dumps(data)}\n\n" # finished successfully final_text = last.get("generated_text") if last else "" metadata = {"citations": last.get("citations", []) if last else []} await db.repos.generation_runs.save_result(run_id, final_text, metadata) await db.repos.generation_runs.set_status(run_id, "completed") - yield f"event: done\ndata: {json.dumps({'run_id': run_id})}\n\n" + yield f"event: done\ndata: {safe_json_dumps({'run_id': run_id})}\n\n" except Exception: logger.exception("Generation stream failed for pipeline_id=%d run_id=%d", pipeline_id, run_id) await db.repos.generation_runs.set_status(run_id, "failed") - yield f"event: error\ndata: {json.dumps({'error': 'Generation failed'})}\n\n" + yield f"event: error\ndata: {safe_json_dumps({'error': 'Generation failed'})}\n\n" except BaseException: await db.repos.generation_runs.set_status(run_id, "failed") raise @@ -810,9 +810,8 @@ async def export_pipeline(request: Request, pipeline_id: int): data = await svc.export_json(pipeline_id) if data is None: return _pipeline_redirect("pipeline_invalid", error=True) - import json as _json filename = f"pipeline_{pipeline_id}.json" - content = _json.dumps(data, ensure_ascii=False, indent=2) + content = safe_json_dumps(data, ensure_ascii=False, indent=2) return Response( content=content, media_type="application/json", diff --git a/tests/test_content_generation_extra.py b/tests/test_content_generation_extra.py index 8b622c7..20270c7 100644 --- a/tests/test_content_generation_extra.py +++ b/tests/test_content_generation_extra.py @@ -67,7 +67,7 @@ async def create_run(self, pipeline_id, prompt): self._runs[run_id] = run return run_id - async def set_status(self, run_id, status): + async def set_status(self, run_id, status, metadata=None): if run_id in self._runs: self._runs[run_id].status = status diff --git a/tests/test_content_generation_service.py b/tests/test_content_generation_service.py index b6c8b35..fe0380e 100644 --- a/tests/test_content_generation_service.py +++ b/tests/test_content_generation_service.py @@ -37,7 +37,7 @@ async def create_run(self, pipeline_id, prompt): self._runs[run_id] = run return run_id - async def set_status(self, run_id, status): + async def set_status(self, run_id, status, metadata=None): if run_id in self._runs: self._runs[run_id].status = status diff --git a/tests/test_coverage_batch4.py b/tests/test_coverage_batch4.py index 3bb5b6d..53dbe2b 100644 --- a/tests/test_coverage_batch4.py +++ b/tests/test_coverage_batch4.py @@ -1836,7 +1836,7 @@ async def test_dispatcher_handle_photo_due_no_service(): task = _make_task(CollectionTaskType.PHOTO_DUE) await dispatcher._handle_photo_due(task) tasks_repo.update_collection_task.assert_awaited_with( - 1, CollectionTaskStatus.COMPLETED, note="No photo service" + 1, CollectionTaskStatus.FAILED, error="PhotoTaskService not configured" ) @@ -1876,7 +1876,7 @@ async def test_dispatcher_handle_photo_auto_no_service(): task = _make_task(CollectionTaskType.PHOTO_AUTO) await dispatcher._handle_photo_auto(task) tasks_repo.update_collection_task.assert_awaited_with( - 1, CollectionTaskStatus.COMPLETED, note="No photo auto service" + 1, CollectionTaskStatus.FAILED, error="PhotoAutoUploadService not configured" ) diff --git a/tests/test_coverage_batch8.py b/tests/test_coverage_batch8.py index 2f6d293..6e368ca 100644 --- a/tests/test_coverage_batch8.py +++ b/tests/test_coverage_batch8.py @@ -1381,7 +1381,7 @@ async def test_photo_due_no_service(self): await dispatcher._handle_photo_due(task) tasks_repo.update_collection_task.assert_awaited() call_args = tasks_repo.update_collection_task.call_args - assert call_args[0][1] == CollectionTaskStatus.COMPLETED + assert call_args[0][1] == CollectionTaskStatus.FAILED @pytest.mark.asyncio async def test_photo_auto_no_service(self): diff --git a/tests/test_pipeline_nodes_handlers.py b/tests/test_pipeline_nodes_handlers.py index 7c9961b..b790350 100644 --- a/tests/test_pipeline_nodes_handlers.py +++ b/tests/test_pipeline_nodes_handlers.py @@ -77,6 +77,8 @@ async def test_retrieve_context_no_search_engine(): ctx = NodeContext() await RetrieveContextHandler().execute({}, ctx, {}) assert ctx.get_global("context_messages") == [] + errors = ctx.get_errors() + assert any(e["code"] == "missing_dependency" for e in errors) @pytest.mark.asyncio @@ -260,6 +262,8 @@ async def test_image_generate_no_service(): ctx.set_global("generated_text", "a cat") await ImageGenerateHandler().execute({"model": "test"}, ctx, {}) assert ctx.get_global("image_url") is None + errors = ctx.get_errors() + assert any(e["code"] == "missing_dependency" for e in errors) @pytest.mark.asyncio @@ -330,7 +334,8 @@ async def test_notify_no_service(): ctx = NodeContext() ctx.set_global("generated_text", "hello") await NotifyHandler().execute({}, ctx, {}) - # No crash + errors = ctx.get_errors() + assert any(e["code"] == "missing_dependency" for e in errors) @pytest.mark.asyncio @@ -500,8 +505,8 @@ async def test_delay_range(mock_sleep): async def test_react_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await ReactHandler().execute({"emoji": "👍"}, ctx, {}) - # No crash + with pytest.raises(RuntimeError, match="client_pool not available"): + await ReactHandler().execute({"emoji": "👍"}, ctx, {}) @pytest.mark.asyncio @@ -551,9 +556,10 @@ async def test_react_random_emojis(): async def test_react_records_error_when_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await ReactHandler().execute( - {"emoji": "👍"}, ctx, {"client_pool": None, "_current_node_id": "react_1"} - ) + with pytest.raises(RuntimeError, match="client_pool not available"): + await ReactHandler().execute( + {"emoji": "👍"}, ctx, {"client_pool": None, "_current_node_id": "react_1"} + ) errors = ctx.get_errors() assert len(errors) == 1 assert errors[0]["code"] == "no_client_pool" @@ -644,8 +650,8 @@ async def test_react_records_unexpected_error_for_generic_exception(): async def test_forward_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await ForwardHandler().execute({"targets": []}, ctx, {}) - # No crash + with pytest.raises(RuntimeError, match="client_pool not available"): + await ForwardHandler().execute({"targets": []}, ctx, {}) @pytest.mark.asyncio @@ -695,11 +701,12 @@ async def test_forward_get_client_returns_none_skips(): async def test_forward_records_error_when_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await ForwardHandler().execute( - {"targets": [{"phone": "+1", "dialog_id": -1}]}, - ctx, - {"client_pool": None, "_current_node_id": "fwd_1"}, - ) + with pytest.raises(RuntimeError, match="client_pool not available"): + await ForwardHandler().execute( + {"targets": [{"phone": "+1", "dialog_id": -1}]}, + ctx, + {"client_pool": None, "_current_node_id": "fwd_1"}, + ) errors = ctx.get_errors() assert errors and errors[0]["code"] == "no_client_pool" assert errors[0]["node_id"] == "fwd_1" @@ -750,8 +757,8 @@ async def test_forward_records_flood_wait_error(): async def test_delete_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await DeleteMessageHandler().execute({}, ctx, {}) - # No crash + with pytest.raises(RuntimeError, match="client_pool not available"): + await DeleteMessageHandler().execute({}, ctx, {}) @pytest.mark.asyncio @@ -786,9 +793,10 @@ async def test_delete_client_none_breaks(): async def test_delete_records_error_when_no_client_pool(): ctx = NodeContext() ctx.set_global("context_messages", [_msg()]) - await DeleteMessageHandler().execute( - {}, ctx, {"client_pool": None, "_current_node_id": "del_1"} - ) + with pytest.raises(RuntimeError, match="client_pool not available"): + await DeleteMessageHandler().execute( + {}, ctx, {"client_pool": None, "_current_node_id": "del_1"} + ) errors = ctx.get_errors() assert errors and errors[0]["code"] == "no_client_pool" assert errors[0]["node_id"] == "del_1" @@ -873,7 +881,8 @@ async def test_condition_gt_type_error(): async def test_search_trigger_no_engine(): ctx = NodeContext() await SearchQueryTriggerHandler().execute({"query": "test"}, ctx, {}) - # No crash, no trigger_matched set + errors = ctx.get_errors() + assert any(e["code"] == "missing_dependency" for e in errors) @pytest.mark.asyncio diff --git a/tests/test_unified_dispatcher.py b/tests/test_unified_dispatcher.py index e664c35..5e0ad00 100644 --- a/tests/test_unified_dispatcher.py +++ b/tests/test_unified_dispatcher.py @@ -588,7 +588,7 @@ async def test_handle_photo_due_success(dispatcher, mock_tasks_repo, mock_photo_ @pytest.mark.asyncio async def test_handle_photo_due_no_service(dispatcher, mock_tasks_repo): - """_handle_photo_due completes with note when no service configured.""" + """_handle_photo_due fails when no service configured (was COMPLETED silently).""" dispatcher._photo_task_service = None task = CollectionTask( @@ -602,7 +602,7 @@ async def test_handle_photo_due_no_service(dispatcher, mock_tasks_repo): mock_tasks_repo.update_collection_task.assert_called() args, kwargs = mock_tasks_repo.update_collection_task.call_args - assert args[1] == CollectionTaskStatus.COMPLETED + assert args[1] == CollectionTaskStatus.FAILED @pytest.mark.asyncio @@ -651,7 +651,7 @@ async def test_handle_photo_auto_success(dispatcher, mock_tasks_repo, mock_photo @pytest.mark.asyncio async def test_handle_photo_auto_no_service(dispatcher, mock_tasks_repo): - """_handle_photo_auto completes with note when no service configured.""" + """_handle_photo_auto fails when no service configured (was COMPLETED silently).""" dispatcher._photo_auto_upload_service = None task = CollectionTask( @@ -665,7 +665,7 @@ async def test_handle_photo_auto_no_service(dispatcher, mock_tasks_repo): mock_tasks_repo.update_collection_task.assert_called() args, kwargs = mock_tasks_repo.update_collection_task.call_args - assert args[1] == CollectionTaskStatus.COMPLETED + assert args[1] == CollectionTaskStatus.FAILED # === _handle_pipeline_run tests === @@ -1081,6 +1081,7 @@ async def mock_execute(query, params=()): dispatcher._db = mock_db dispatcher._pipeline_bundle = MagicMock() + dispatcher._client_pool = MagicMock() task = CollectionTask( id=1, @@ -1132,6 +1133,7 @@ async def mock_execute(query, params=()): dispatcher._db = mock_db dispatcher._pipeline_bundle = mock_pipeline_bundle + dispatcher._client_pool = MagicMock() task = CollectionTask( id=1, diff --git a/tests/test_unified_dispatcher_extra.py b/tests/test_unified_dispatcher_extra.py index c0afe0b..cfbc8f1 100644 --- a/tests/test_unified_dispatcher_extra.py +++ b/tests/test_unified_dispatcher_extra.py @@ -177,7 +177,7 @@ async def test_photo_due_no_id(): async def test_photo_due_no_service(): d = _dispatcher(photo_task_service=None) await d._handle_photo_due(_task(CollectionTaskType.PHOTO_DUE)) - assert d._tasks.update_collection_task.call_args[0][1] == CollectionTaskStatus.COMPLETED + assert d._tasks.update_collection_task.call_args[0][1] == CollectionTaskStatus.FAILED @pytest.mark.asyncio @@ -209,7 +209,7 @@ async def test_photo_auto_no_id(): async def test_photo_auto_no_service(): d = _dispatcher(photo_auto_upload_service=None) await d._handle_photo_auto(_task(CollectionTaskType.PHOTO_AUTO)) - assert d._tasks.update_collection_task.call_args[0][1] == CollectionTaskStatus.COMPLETED + assert d._tasks.update_collection_task.call_args[0][1] == CollectionTaskStatus.FAILED @pytest.mark.asyncio diff --git a/tests/test_unified_dispatcher_extra2.py b/tests/test_unified_dispatcher_extra2.py index 329c10c..e356d5f 100644 --- a/tests/test_unified_dispatcher_extra2.py +++ b/tests/test_unified_dispatcher_extra2.py @@ -322,6 +322,7 @@ async def mock_execute(query, params=()): d._db = mock_db d._pipeline_bundle = mock_pipeline_bundle + d._client_pool = MagicMock() payload = ContentPublishTaskPayload(pipeline_id=None) @@ -385,6 +386,7 @@ async def mock_execute(query, params=()): d._db = mock_db d._pipeline_bundle = MagicMock() + d._client_pool = MagicMock() payload = ContentPublishTaskPayload(pipeline_id=None) @@ -442,6 +444,7 @@ async def mock_execute(query, params=()): d._db = mock_db d._pipeline_bundle = mock_pipeline_bundle + d._client_pool = MagicMock() payload = ContentPublishTaskPayload(pipeline_id=None) @@ -473,6 +476,7 @@ async def mock_execute(query, params=()): d._db = mock_db d._pipeline_bundle = MagicMock() + d._client_pool = MagicMock() payload = ContentPublishTaskPayload(pipeline_id=None)