Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agent_reach/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .xiaoyuzhou import XiaoyuzhouChannel
from .v2ex import V2EXChannel
from .xueqiu import XueqiuChannel

from .toutiao import ToutiaoChannel


ALL_CHANNELS: List[Channel] = [
Expand All @@ -40,6 +40,7 @@
XiaoyuzhouChannel(),
V2EXChannel(),
XueqiuChannel(),
ToutiaoChannel(),
RSSChannel(),
ExaSearchChannel(),
WebChannel(),
Expand Down
115 changes: 115 additions & 0 deletions agent_reach/channels/toutiao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""Toutiao (今日头条) — search articles and trending content."""

import json
import re
import urllib.parse
import urllib.request
from typing import Any, List, Tuple

from .base import Channel

_UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
_TIMEOUT = 10
_SEARCH_URL = (
"https://so.toutiao.com/search"
"?dvpf=pc&source=input&keyword={keyword}&enable_druid_v2=1"
)

_SKIP_TEMPLATES = ("Search", "Bottom", "76-", "20-", "26-", "67-baike")


def _fetch_html(url: str) -> str:
"""Fetch URL and return HTML string."""
req = urllib.request.Request(url, headers={"User-Agent": _UA})
with urllib.request.urlopen(req, timeout=_TIMEOUT) as resp:
return resp.read().decode("utf-8")


def _parse_search_results(html: str) -> list:
"""Extract article results from Toutiao search page HTML.

The page embeds each search result as a JSON object inside a <script> tag.
"""
scripts = re.findall(r"<script[^>]*>(.*?)</script>", html, re.DOTALL)
articles = []
for s in scripts:
if len(s) < 1000:
continue
if not s.strip().startswith("{"):
continue
try:
data = json.loads(s).get("data", {})
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(data, dict):
continue

title = data.get("title", "")
tpl = data.get("template_key", "")
if not title:
continue
if any(tpl.startswith(p) for p in _SKIP_TEMPLATES):
continue

article_url = (
(data.get("display") or {}).get("info", {}).get("url", "")
or data.get("article_url", "")
or data.get("source_url", "")
)

if not article_url:
continue

articles.append({
"title": title,
"url": article_url,
"source": data.get("media_name", "") or data.get("source", ""),
"abstract": (data.get("abstract", "") or "")[:300],
"publish_time": data.get("publish_time"),
"read_count": data.get("read_count"),
"comment_count": data.get("comment_count"),
})
return articles


class ToutiaoChannel(Channel):
name = "toutiao"
description = "今日头条搜索与资讯"
backends = ["Toutiao Web (public)"]
tier = 0

def can_handle(self, url: str) -> bool:
from urllib.parse import urlparse
d = urlparse(url).netloc.lower()
return "toutiao.com" in d

def check(self, config=None) -> Tuple[str, str]:
try:
test_url = _SEARCH_URL.format(keyword="test")
req = urllib.request.Request(test_url, headers={"User-Agent": _UA})
with urllib.request.urlopen(req, timeout=_TIMEOUT) as resp:
if resp.status == 200:
content = resp.read().decode("utf-8")
if "data" in content or "title" in content:
return "ok", "头条搜索可用(搜索文章、视频、资讯)"
return "warn", "头条搜索返回非预期内容"
return "warn", "头条搜索返回非 200 状态"
except Exception as e:
return "warn", f"头条搜索连接失败(可能需要代理):{e}"

def search(self, keyword: str, limit: int = 10) -> list:
"""搜索头条文章。

Args:
keyword: 搜索关键词
limit: 最多返回条数

Returns:
list of dicts with keys:
title, url, source, abstract, publish_time, read_count, comment_count
"""
encoded = urllib.parse.quote(keyword)
url = _SEARCH_URL.format(keyword=encoded)
html = _fetch_html(url)
return _parse_search_results(html)[:limit]
1 change: 1 addition & 0 deletions tests/test_channel_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def test_channel_can_handle_contract():
"weibo": "https://weibo.com/u/1749127163",
"rss": "https://example.com/feed.xml",
"xueqiu": "https://xueqiu.com/S/SH600519",
"toutiao": "https://so.toutiao.com/search?keyword=test",
"exa_search": "https://example.com",
"web": "https://example.com",
}
Expand Down
80 changes: 80 additions & 0 deletions tests/test_toutiao_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
"""Tests for Toutiao channel."""

import unittest
from unittest.mock import patch, MagicMock

from agent_reach.channels.toutiao import ToutiaoChannel, _parse_search_results


class TestToutiaoChannel(unittest.TestCase):

def setUp(self):
self.ch = ToutiaoChannel()

def test_can_handle_toutiao_urls(self):
assert self.ch.can_handle("https://www.toutiao.com/article/123")
assert self.ch.can_handle("https://so.toutiao.com/search?keyword=test")
assert self.ch.can_handle("https://m.toutiao.com/abc")

def test_can_handle_rejects_other_urls(self):
assert not self.ch.can_handle("https://www.baidu.com")
assert not self.ch.can_handle("https://weibo.com/123")
assert not self.ch.can_handle("https://github.com/user/repo")

def test_check_ok(self):
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.read = lambda: b'{"data":{"title":"test"}}'

mock_enter = MagicMock(return_value=mock_resp)
mock_exit = MagicMock(return_value=False)
mock_resp.__enter__ = mock_enter
mock_resp.__exit__ = mock_exit

with patch("urllib.request.urlopen", return_value=mock_resp):
status, msg = self.ch.check()
assert status == "ok"

def test_check_network_error(self):
with patch("urllib.request.urlopen", side_effect=Exception("timeout")):
status, msg = self.ch.check()
assert status == "warn"
assert "连接失败" in msg

def test_channel_attributes(self):
assert self.ch.name == "toutiao"
assert self.ch.tier == 0
assert len(self.ch.backends) > 0

def test_parse_search_results_with_article(self):
abstract = "摘要内容" + "x" * 1000
html = (
'<script>{"data":{"title":"测试文章","abstract":"' + abstract + '",'
'"media_name":"测试媒体","publish_time":1774332253,'
'"read_count":100,"comment_count":5,'
'"template_key":"undefined-default",'
'"display":{"info":{"url":"https://www.toutiao.com/group/123/"}}}}</script>'
)
results = _parse_search_results(html)
assert len(results) == 1
assert results[0]["title"] == "测试文章"
assert results[0]["source"] == "测试媒体"
assert results[0]["url"] == "https://www.toutiao.com/group/123/"

def test_parse_search_results_skips_non_article(self):
html = (
'<script>{"data":{"title":"搜索栏","template_key":"SearchBar",'
'"display":{}}}</script>'
)
results = _parse_search_results(html)
assert len(results) == 0

def test_parse_search_results_empty_html(self):
results = _parse_search_results("")
assert results == []

def test_parse_search_results_skips_short_scripts(self):
html = "<script>{\"data\":{\"title\":\"短\"}}</script>"
results = _parse_search_results(html)
assert results == []