Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions src/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
ProviderModelCacheEntry,
ProviderModelCompatibilityRecord,
)
from src.utils.json import safe_json_dumps

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -133,7 +134,7 @@ class _ToolTracker:
_tool_id_to_name: dict[str, str] = field(default_factory=dict, init=False)

async def _put(self, payload: dict) -> None:
await self.queue.put(f"data: {json.dumps(payload, ensure_ascii=False)}\n\n")
await self.queue.put(f"data: {safe_json_dumps(payload, ensure_ascii=False)}\n\n")
await asyncio.sleep(0)

async def on_first_event(self) -> None:
Expand Down Expand Up @@ -268,7 +269,7 @@ async def _ticker() -> None:
"Timeout extended +%.0fs (SDK activity, was %.0fs left, %d extensions left)",
activity_extend, remaining, extensions_remaining,
)
ext_payload = json.dumps(
ext_payload = safe_json_dumps(
{
"type": "status",
"text": f"Агент активен — продлён (+{int(activity_extend)}с)",
Expand All @@ -287,7 +288,7 @@ async def _ticker() -> None:
and (now - api_request_ts[0]) > thinking_delay
):
_thinking_shown = True
thinking_payload = json.dumps(
thinking_payload = safe_json_dumps(
{"type": "thinking", "text": "Думает..."},
ensure_ascii=False,
)
Expand All @@ -298,7 +299,7 @@ async def _ticker() -> None:
# Show countdown
remaining_int = int(deadline - time.monotonic())
if remaining_int > 0:
payload = json.dumps(
payload = safe_json_dumps(
{"type": "countdown", "text": f"{label} ({remaining_int}с до таймаута)"},
ensure_ascii=False,
)
Expand Down Expand Up @@ -482,7 +483,7 @@ def _on_stderr(line: str) -> None:
_api_request_count[0] += 1
_api_request_ts[0] = time.monotonic()
label = f"Жду ответ Claude API #{_api_request_count[0]}: «{_prompt_short}»"
payload = json.dumps({"type": "status", "text": label}, ensure_ascii=False)
payload = safe_json_dumps({"type": "status", "text": label}, ensure_ascii=False)
try:
queue.put_nowait(f"data: {payload}\n\n")
except Exception:
Expand All @@ -496,7 +497,7 @@ def _on_stderr(line: str) -> None:
break
if label and label != _last_emitted[0]:
_last_emitted[0] = label
payload = json.dumps({"type": "status", "text": label}, ensure_ascii=False)
payload = safe_json_dumps({"type": "status", "text": label}, ensure_ascii=False)
try:
queue.put_nowait(f"data: {payload}\n\n")
except Exception:
Expand All @@ -512,7 +513,7 @@ def _on_stderr(line: str) -> None:
for tag in ("[WARN]", "[warn]", "[ERROR]", "[error]"):
display = display.replace(tag, "").strip()
if display:
warn_payload = json.dumps(
warn_payload = safe_json_dumps(
{"type": "warning", "text": display},
ensure_ascii=False,
)
Expand Down Expand Up @@ -660,7 +661,7 @@ def _on_stderr(line: str) -> None:
_last_rate_limit[0] = rl_summary
if rl_status == "rejected":
# Hard reject — surface as warning, not just status
warn_payload = json.dumps(
warn_payload = safe_json_dumps(
{"type": "warning", "text": f"⛔ {rl_text}. API отклоняет запросы."},
ensure_ascii=False,
)
Expand Down Expand Up @@ -697,7 +698,7 @@ def _on_stderr(line: str) -> None:
_last_activity[0] = time.monotonic()
full_text += text_chunk
streamed = True
chunk_payload = json.dumps(
chunk_payload = safe_json_dumps(
{"text": text_chunk}, ensure_ascii=False
)
await queue.put(f"data: {chunk_payload}\n\n")
Expand Down Expand Up @@ -727,7 +728,7 @@ def _on_stderr(line: str) -> None:
for _idx, block in enumerate(msg.content):
if isinstance(block, TextBlock) and not streamed:
full_text += block.text
chunk_payload = json.dumps(
chunk_payload = safe_json_dumps(
{"text": block.text}, ensure_ascii=False
)
await queue.put(f"data: {chunk_payload}\n\n")
Expand All @@ -739,7 +740,7 @@ def _on_stderr(line: str) -> None:
block.name, _idx, tool_use_id=block.id
)
tracker.accumulate_input(
json.dumps(block.input or {}, ensure_ascii=False)
safe_json_dumps(block.input or {}, ensure_ascii=False)
)
await tracker.on_block_stop(_idx)
elif isinstance(block, ToolResultBlock):
Expand Down Expand Up @@ -771,7 +772,7 @@ def _on_stderr(line: str) -> None:
_sid = getattr(msg, "session_id", None)
if isinstance(_sid, str) and _sid:
done_data["session_id"] = _sid
done_payload = json.dumps(done_data, ensure_ascii=False)
done_payload = safe_json_dumps(done_data, ensure_ascii=False)
await queue.put(f"data: {done_payload}\n\n")
else:
logger.warning(
Expand All @@ -791,7 +792,7 @@ def _on_stderr(line: str) -> None:
"Agent timeout after %.1fs (thread %d): %s", elapsed, thread_id, exc,
)
stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else None
err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": str(exc), "details": stderr_summary},
ensure_ascii=False,
)
Expand All @@ -806,7 +807,7 @@ def _on_stderr(line: str) -> None:
"Установите: npm install -g @anthropic-ai/claude-code"
)
await queue.put(
f"data: {json.dumps({'error': err_msg}, ensure_ascii=False)}\n\n"
f"data: {safe_json_dumps({'error': err_msg}, ensure_ascii=False)}\n\n"
)
await queue.put(None)
return
Expand All @@ -822,7 +823,7 @@ def _on_stderr(line: str) -> None:
# Truncate long stderr for UI
if len(details) > 500:
details = details[:500] + "..."
err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": "Ошибка процесса Claude CLI", "details": details},
ensure_ascii=False,
)
Expand All @@ -845,7 +846,7 @@ def _on_stderr(line: str) -> None:
if "overloaded" in str(exc).lower()
else "Не удалось подключиться к Claude CLI"
)
err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": conn_msg, "details": stderr_summary},
ensure_ascii=False,
)
Expand All @@ -859,7 +860,7 @@ def _on_stderr(line: str) -> None:
"Claude SDK error after %.1fs (thread %d): %s", elapsed, thread_id, exc,
)
stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else ""
err_payload = json.dumps(
err_payload = safe_json_dumps(
{
"error": f"Ошибка Claude SDK: {exc}",
"details": stderr_summary or None,
Expand Down Expand Up @@ -937,7 +938,7 @@ def _on_stderr(line: str) -> None:
)
# Send error details to user via SSE
stderr_summary = "\n".join(stderr_lines[-10:]) if stderr_lines else ""
err_payload = json.dumps(
err_payload = safe_json_dumps(
{
"error": f"Ошибка агента: {last_err}",
"details": stderr_summary or None,
Expand Down Expand Up @@ -1628,9 +1629,9 @@ async def chat_stream(
if not full_text:
logger.debug("Deepagents returned empty response for provider=%s", cfg.provider)
if full_text:
chunk_payload = json.dumps({"text": full_text}, ensure_ascii=False)
chunk_payload = safe_json_dumps({"text": full_text}, ensure_ascii=False)
await queue.put(f"data: {chunk_payload}\n\n")
done_payload = json.dumps(
done_payload = safe_json_dumps(
{
"done": True,
"full_text": full_text,
Expand Down Expand Up @@ -1970,7 +1971,7 @@ async def chat_stream(
status = await self.get_runtime_status()
backend_name = status.selected_backend
if status.error and (backend_name is None or status.using_override):
err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": f"Ошибка агента: {status.error}"}, ensure_ascii=False
)
yield f"data: {err_payload}\n\n"
Expand All @@ -1983,7 +1984,7 @@ async def chat_stream(
backend = self._deepagents_backend
model = None
else:
err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": "Ошибка агента: не удалось выбрать backend."},
ensure_ascii=False,
)
Expand Down Expand Up @@ -2057,7 +2058,7 @@ async def _run_backend(
):
error_text = "Не удалось подключиться к Ollama. Проверьте, что сервис запущен."

err_payload = json.dumps(
err_payload = safe_json_dumps(
{"error": failure_prefix(error_text)},
ensure_ascii=False,
)
Expand All @@ -2084,7 +2085,7 @@ def _cleanup(t: asyncio.Task) -> None:
task.add_done_callback(_cleanup)

# Immediate feedback before backend connects (can take 10-30s)
init_payload = json.dumps(
init_payload = safe_json_dumps(
{"type": "status", "text": f"Подключение к {backend_name}..."},
ensure_ascii=False,
)
Expand Down
3 changes: 2 additions & 1 deletion src/database/repositories/collection_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
StatsAllTaskPayload,
TranslateBatchTaskPayload,
)
from src.utils.json import safe_json_dumps

_ALLOWED_PAYLOAD_FILTER_KEYS = frozenset({"sq_id", "pipeline_id"})

Expand Down Expand Up @@ -90,7 +91,7 @@ def _serialize_payload(
),
):
return payload.model_dump_json()
return json.dumps(payload)
return safe_json_dumps(payload)

@staticmethod
def _to_task(row: aiosqlite.Row) -> CollectionTask:
Expand Down
17 changes: 12 additions & 5 deletions src/database/repositories/generation_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ async def create_run(self, pipeline_id: int | None, prompt: str) -> int:
await self._db.commit()
return cur.lastrowid or 0

async def set_status(self, run_id: int, status: str) -> None:
await self._db.execute(
"UPDATE generation_runs SET status = ?, updated_at = datetime('now') WHERE id = ?",
(status, run_id),
)
async def set_status(self, run_id: int, status: str, metadata: dict | None = None) -> None:
if metadata is not None:
await self._db.execute(
("UPDATE generation_runs SET status = ?, metadata = ?, "
"updated_at = datetime('now') WHERE id = ?"),
(status, json.dumps(metadata, ensure_ascii=False), run_id),
)
else:
await self._db.execute(
"UPDATE generation_runs SET status = ?, updated_at = datetime('now') WHERE id = ?",
(status, run_id),
)
await self._db.commit()

async def save_result(
Expand Down
11 changes: 6 additions & 5 deletions src/database/repositories/telegram_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from src.database.repositories._transactions import begin_immediate
from src.models import TelegramCommand, TelegramCommandStatus
from src.utils.json import safe_json_dumps


def _parse_json(raw: str | None) -> dict[str, Any] | None:
Expand Down Expand Up @@ -48,10 +49,10 @@ async def create_command(self, command: TelegramCommand) -> int:
""",
(
command.command_type,
json.dumps(command.payload),
safe_json_dumps(command.payload),
command.status.value,
command.requested_by,
json.dumps(command.result_payload) if command.result_payload is not None else None,
safe_json_dumps(command.result_payload) if command.result_payload is not None else None,
),
)
await self._db.commit()
Expand Down Expand Up @@ -174,15 +175,15 @@ async def update_command(
TelegramCommandStatus.CANCELLED,
}:
finished_at = datetime.now(timezone.utc).isoformat()
payload_json = json.dumps(payload) if payload is not None else None
payload_json = safe_json_dumps(payload) if payload is not None else None
if status == TelegramCommandStatus.PENDING:
# Reset started_at when re-queueing so a retried command shows
# a fresh run timestamp rather than the interrupted attempt's.
sets = ["status = ?", "error = ?", "result_payload = ?", "started_at = NULL", "finished_at = NULL"]
params: list[Any] = [
status.value,
error,
json.dumps(result_payload) if result_payload is not None else None,
safe_json_dumps(result_payload) if result_payload is not None else None,
]
if payload_json is not None:
sets.append("payload = ?")
Expand All @@ -197,7 +198,7 @@ async def update_command(
params = [
status.value,
error,
json.dumps(result_payload) if result_payload is not None else None,
safe_json_dumps(result_payload) if result_payload is not None else None,
finished_at,
]
if payload_json is not None:
Expand Down
11 changes: 6 additions & 5 deletions src/services/agent_provider_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from src.config import AppConfig, resolve_session_encryption_secret
from src.database import Database
from src.security import SessionCipher
from src.utils.json import safe_json_dumps

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -165,7 +166,7 @@ async def save_provider_configs(self, configs: list[ProviderRuntimeConfig]) -> N
"last_validation_error": cfg.last_validation_error,
}
)
await self._db.set_setting(PROVIDER_SETTINGS_KEY, json.dumps(payload, ensure_ascii=False))
await self._db.set_setting(PROVIDER_SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False))

async def load_model_cache(self) -> dict[str, ProviderModelCacheEntry]:
raw = await self._db.get_setting(MODEL_CACHE_SETTINGS_KEY)
Expand Down Expand Up @@ -234,7 +235,7 @@ async def save_model_cache(self, cache: dict[str, ProviderModelCacheEntry]) -> N
for provider, entry in cache.items()
}
await self._db.set_setting(
MODEL_CACHE_SETTINGS_KEY, json.dumps(payload, ensure_ascii=False)
MODEL_CACHE_SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False)
)

async def refresh_models_for_provider(
Expand Down Expand Up @@ -460,7 +461,7 @@ def config_fingerprint(
secret_hash = ""
if secret_payload:
secret_hash = hashlib.sha256(
json.dumps(secret_payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
safe_json_dumps(secret_payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
).hexdigest()[:16]
payload = {
"provider": cfg.provider,
Expand All @@ -470,7 +471,7 @@ def config_fingerprint(
"secret_hash": secret_hash,
}
return hashlib.sha256(
json.dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
safe_json_dumps(payload, sort_keys=True, ensure_ascii=False).encode("utf-8")
).hexdigest()

def get_compatibility_record(
Expand Down Expand Up @@ -648,7 +649,7 @@ async def export_compatibility_catalog(
"providers": providers_payload,
}
export_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
safe_json_dumps(payload, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
return export_path
Expand Down
6 changes: 4 additions & 2 deletions src/services/content_generation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,15 @@ async def generate(
except Exception:
logger.warning("Failed to send draft notification", exc_info=True)
return run
except Exception:
except Exception as exc:
logger.exception(
"Content generation failed for pipeline_id=%s run_id=%s",
pipeline.id,
run_id,
)
await self._db.repos.generation_runs.set_status(run_id, "failed")
node_errors = getattr(exc, "node_errors", None)
meta = {"node_errors": node_errors} if node_errors else None
await self._db.repos.generation_runs.set_status(run_id, "failed", metadata=meta)
raise

async def _run_generation(
Expand Down
3 changes: 2 additions & 1 deletion src/services/image_provider_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from src.config import AppConfig, resolve_session_encryption_secret
from src.database import Database
from src.security import SessionCipher
from src.utils.json import safe_json_dumps

if TYPE_CHECKING:
from src.services.provider_adapters import ImageAdapter
Expand Down Expand Up @@ -119,7 +120,7 @@ async def save_provider_configs(self, configs: list[ImageProviderConfig]) -> Non
elif cfg._api_key_enc_preserved:
entry["api_key_enc"] = cfg._api_key_enc_preserved
payload.append(entry)
await self._db.set_setting(SETTINGS_KEY, json.dumps(payload, ensure_ascii=False))
await self._db.set_setting(SETTINGS_KEY, safe_json_dumps(payload, ensure_ascii=False))

# ── UI helpers ──

Expand Down
Loading
Loading